You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/26 02:13:01 UTC

[37/55] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index d54ce97..cb9fd06 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -1,372 +1,372 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
-import org.apache.sysml.runtime.instructions.cp.IntObject;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- *
- */
-public class RemoteDPParWorkerReducer extends ParWorker
-	implements Reducer<LongWritable, Writable, Writable, Writable>
-{
-
-	//MR data partitioning attributes
-	private String _inputVar = null;
-	private String _iterVar = null;
-	private PDataPartitionFormat _dpf = null;
-	private OutputInfo _info = null;
-	private int _rlen = -1;
-	private int _clen = -1;
-	private int _brlen = -1;
-	private int _bclen = -1;
-	
-	//reuse matrix partition
-	private MatrixBlock _partition = null; 
-	private boolean _tSparseCol = false;
-		
-	//MR ParWorker attributes  
-	protected String  _stringID       = null; 
-	protected HashMap<String, String> _rvarFnames = null; 
-
-	//cached collector/reporter
-	protected OutputCollector<Writable, Writable> _out = null;
-	protected Reporter _report = null;
-	
-	/**
-	 * 
-	 */
-	public RemoteDPParWorkerReducer() 
-	{
-		
-	}
-	
-	@Override
-	public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
-		throws IOException 
-	{
-		//cache collector/reporter (for write in close)
-		_out = out;
-		_report = reporter;
-		
-		//collect input partition
-		if( _info == OutputInfo.BinaryBlockOutputInfo )
-			_partition = collectBinaryBlock( valueList );
-		else
-			_partition = collectBinaryCellInput( valueList );
-			
-		//update in-memory matrix partition
-		MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
-		mo.setInMemoryPartition( _partition );
-		
-		//execute program
-		LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")");
-		try {
-			//create tasks for input data
-			Task lTask = new Task(TaskType.SET);
-			lTask.addIteration( new IntObject(_iterVar,key.get()) );
-			
-			//execute program
-			executeTask( lTask );
-		}
-		catch(Exception ex)
-		{
-			throw new IOException("ParFOR: Failed to execute task.",ex);
-		}
-		
-		//statistic maintenance (after final export)
-		RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
-	}
-
-	/**
-	 * 
-	 */
-	@Override
-	public void configure(JobConf job)
-	{
-		//Step 1: configure data partitioning information
-		_rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
-		_clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
-		_brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-		_bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
-		_iterVar = MRJobConfiguration.getPartitioningItervar( job );
-		_inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
-		_dpf = MRJobConfiguration.getPartitioningFormat( job );		
-		switch( _dpf ) { //create matrix partition for reuse
-			case ROW_WISE:    _rlen = 1; break;
-			case COLUMN_WISE: _clen = 1; break;
-			default:  throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
-		}
-		_info = MRJobConfiguration.getPartitioningOutputInfo( job );
-		_tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); 
-		if( _tSparseCol )
-			_partition = new MatrixBlock((int)_clen, _rlen, true);
-		else
-			_partition = new MatrixBlock((int)_rlen, _clen, false);
-
-		//Step 1: configure parworker
-		String taskID = job.get("mapred.tip.id");		
-		LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
-			
-		try
-		{
-			_stringID = taskID;
-			_workerID = IDHandler.extractIntID(_stringID); //int task ID
-
-			//use the given job configuration as source for all new job confs 
-			//NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar 
-			//which includes a core-default.xml configuration which hides the actual default cluster configuration
-			//in the context of mr jobs (for example this config points to local fs instead of hdfs by default). 
-			if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-				ConfigurationManager.setCachedJobConf(job);
-			}
-			
-			//create local runtime program
-			String in = MRJobConfiguration.getProgramBlocks(job);
-			ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
-			_childBlocks = body.getChildBlocks();
-			_ec          = body.getEc();				
-			_resultVars  = body.getResultVarNames();
-	
-			//init local cache manager 
-			if( !CacheableData.isCachingActive() ) {
-				String uuid = IDHandler.createDistributedUniqueID();
-				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-				CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-			}
-			if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-				CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
-			}
-			
-			//ensure that resultvar files are not removed
-			super.pinResultVariables();
-		
-			//enable/disable caching (if required)
-			boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
-			if( !cpCaching )
-				CacheableData.disableCaching();
-
-			_numTasks    = 0;
-			_numIters    = 0;			
-		}
-		catch(Exception ex)
-		{
-			throw new RuntimeException(ex);
-		}
-		
-		//disable parfor stat monitoring, reporting execution times via counters not useful 
-		StatisticMonitor.disableStatMonitoring();
-		
-		//always reset stats because counters per map task (for case of JVM reuse)
-		if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
-		{
-			CacheStatistics.reset();
-			Statistics.reset();
-		}
-	}
-	
-	/**
-	 * 
-	 */
-	@Override
-	public void close() 
-	    throws IOException 
-	{
-		try
-		{
-			//write output if required (matrix indexed write)
-			RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out );
-		
-			//statistic maintenance (after final export)
-			RemoteParForUtils.incrementParForMRCounters(_report, 0, 0);
-			
-			//print heaver hitter per task
-			JobConf job = ConfigurationManager.getCachedJobConf();
-			if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
-				LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));		
-		}
-		catch(Exception ex)
-		{
-			throw new IOException( ex );
-		}
-		
-		//cleanup cache and local tmp dir
-		RemoteParForUtils.cleanupWorkingDirectories();
-		
-		//ensure caching is not disabled for CP in local mode
-		CacheableData.enableCaching();
-	}
-	
-	/**
-	 * Collects a matrixblock partition from a given input iterator over 
-	 * binary blocks.
-	 * 
-	 * Note it reuses the instance attribute _partition - multiple calls
-	 * will overwrite the result.
-	 * 
-	 * @param valueList
-	 * @return
-	 * @throws IOException 
-	 */
-	private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) 
-		throws IOException 
-	{
-		try
-		{
-			//reset reuse block, keep configured representation
-			_partition.reset(_rlen, _clen);	
-
-			while( valueList.hasNext() )
-			{
-				PairWritableBlock pairValue = (PairWritableBlock)valueList.next();
-				int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen;
-				int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
-				MatrixBlock block = pairValue.block;
-				if( !_partition.isInSparseFormat() ) //DENSE
-				{
-					_partition.copy( row_offset, row_offset+block.getNumRows()-1, 
-							   col_offset, col_offset+block.getNumColumns()-1,
-							   pairValue.block, false ); 
-				}
-				else //SPARSE 
-				{
-					_partition.appendToSparse(pairValue.block, row_offset, col_offset);
-				}
-			}
-
-			//final partition cleanup
-			cleanupCollectedMatrixPartition( _partition.isInSparseFormat() );
-		}
-		catch(DMLRuntimeException ex)
-		{
-			throw new IOException(ex);
-		}
-		
-		return _partition;
-	}
-	
-	
-	/**
-	 * Collects a matrixblock partition from a given input iterator over 
-	 * binary cells.
-	 * 
-	 * Note it reuses the instance attribute _partition - multiple calls
-	 * will overwrite the result.
-	 * 
-	 * @param valueList
-	 * @return
-	 * @throws IOException 
-	 */
-	private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList ) 
-		throws IOException 
-	{
-		//reset reuse block, keep configured representation
-		if( _tSparseCol )
-			_partition.reset(_clen, _rlen);	
-		else
-			_partition.reset(_rlen, _clen);
-		
-		switch( _dpf )
-		{
-			case ROW_WISE:
-				while( valueList.hasNext() )
-				{
-					PairWritableCell pairValue = (PairWritableCell)valueList.next();
-					if( pairValue.indexes.getColumnIndex()<0 )
-						continue; //cells used to ensure empty partitions
-					_partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
-				}
-				break;
-			case COLUMN_WISE:
-				while( valueList.hasNext() )
-				{
-					PairWritableCell pairValue = (PairWritableCell)valueList.next();
-					if( pairValue.indexes.getRowIndex()<0 )
-						continue; //cells used to ensure empty partitions
-					if( _tSparseCol )
-						_partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue());
-					else
-						_partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue());
-				}
-				break;
-			default: 
-				throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf);
-		}
-		
-		//final partition cleanup
-		cleanupCollectedMatrixPartition(_tSparseCol);
-		
-		return _partition;
-	}
-	
-	/**
-	 * 
-	 * @param sort
-	 * @throws IOException
-	 */
-	private void cleanupCollectedMatrixPartition(boolean sort) 
-		throws IOException
-	{
-		//sort sparse row contents if required
-		if( _partition.isInSparseFormat() && sort )
-			_partition.sortSparseRows();
-
-		//ensure right number of nnz
-		if( !_partition.isInSparseFormat() )
-			_partition.recomputeNonZeros();
-			
-		//exam and switch dense/sparse representation
-		try {
-			_partition.examSparsity();
-		}
-		catch(Exception ex){
-			throw new IOException(ex);
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ *
+ */
+public class RemoteDPParWorkerReducer extends ParWorker
+	implements Reducer<LongWritable, Writable, Writable, Writable>
+{
+
+	//MR data partitioning attributes
+	private String _inputVar = null;
+	private String _iterVar = null;
+	private PDataPartitionFormat _dpf = null;
+	private OutputInfo _info = null;
+	private int _rlen = -1;
+	private int _clen = -1;
+	private int _brlen = -1;
+	private int _bclen = -1;
+	
+	//reuse matrix partition
+	private MatrixBlock _partition = null; 
+	private boolean _tSparseCol = false;
+		
+	//MR ParWorker attributes  
+	protected String  _stringID       = null; 
+	protected HashMap<String, String> _rvarFnames = null; 
+
+	//cached collector/reporter
+	protected OutputCollector<Writable, Writable> _out = null;
+	protected Reporter _report = null;
+	
+	/**
+	 * 
+	 */
+	public RemoteDPParWorkerReducer() 
+	{
+		
+	}
+	
+	@Override
+	public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
+		throws IOException 
+	{
+		//cache collector/reporter (for write in close)
+		_out = out;
+		_report = reporter;
+		
+		//collect input partition
+		if( _info == OutputInfo.BinaryBlockOutputInfo )
+			_partition = collectBinaryBlock( valueList );
+		else
+			_partition = collectBinaryCellInput( valueList );
+			
+		//update in-memory matrix partition
+		MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
+		mo.setInMemoryPartition( _partition );
+		
+		//execute program
+		LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")");
+		try {
+			//create tasks for input data
+			Task lTask = new Task(TaskType.SET);
+			lTask.addIteration( new IntObject(_iterVar,key.get()) );
+			
+			//execute program
+			executeTask( lTask );
+		}
+		catch(Exception ex)
+		{
+			throw new IOException("ParFOR: Failed to execute task.",ex);
+		}
+		
+		//statistic maintenance (after final export)
+		RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
+	}
+
+	/**
+	 * 
+	 */
+	@Override
+	public void configure(JobConf job)
+	{
+		//Step 1: configure data partitioning information
+		_rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
+		_clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
+		_brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
+		_bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+		_iterVar = MRJobConfiguration.getPartitioningItervar( job );
+		_inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
+		_dpf = MRJobConfiguration.getPartitioningFormat( job );		
+		switch( _dpf ) { //create matrix partition for reuse
+			case ROW_WISE:    _rlen = 1; break;
+			case COLUMN_WISE: _clen = 1; break;
+			default:  throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
+		}
+		_info = MRJobConfiguration.getPartitioningOutputInfo( job );
+		_tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); 
+		if( _tSparseCol )
+			_partition = new MatrixBlock((int)_clen, _rlen, true);
+		else
+			_partition = new MatrixBlock((int)_rlen, _clen, false);
+
+		//Step 1: configure parworker
+		String taskID = job.get("mapred.tip.id");		
+		LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
+			
+		try
+		{
+			_stringID = taskID;
+			_workerID = IDHandler.extractIntID(_stringID); //int task ID
+
+			//use the given job configuration as source for all new job confs 
+			//NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar 
+			//which includes a core-default.xml configuration which hides the actual default cluster configuration
+			//in the context of mr jobs (for example this config points to local fs instead of hdfs by default). 
+			if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+				ConfigurationManager.setCachedJobConf(job);
+			}
+			
+			//create local runtime program
+			String in = MRJobConfiguration.getProgramBlocks(job);
+			ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
+			_childBlocks = body.getChildBlocks();
+			_ec          = body.getEc();				
+			_resultVars  = body.getResultVarNames();
+	
+			//init local cache manager 
+			if( !CacheableData.isCachingActive() ) {
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+				CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
+			}
+			if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
+				CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+			}
+			
+			//ensure that resultvar files are not removed
+			super.pinResultVariables();
+		
+			//enable/disable caching (if required)
+			boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
+			if( !cpCaching )
+				CacheableData.disableCaching();
+
+			_numTasks    = 0;
+			_numIters    = 0;			
+		}
+		catch(Exception ex)
+		{
+			throw new RuntimeException(ex);
+		}
+		
+		//disable parfor stat monitoring, reporting execution times via counters not useful 
+		StatisticMonitor.disableStatMonitoring();
+		
+		//always reset stats because counters per map task (for case of JVM reuse)
+		if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
+		{
+			CacheStatistics.reset();
+			Statistics.reset();
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	@Override
+	public void close() 
+	    throws IOException 
+	{
+		try
+		{
+			//write output if required (matrix indexed write)
+			RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out );
+		
+			//statistic maintenance (after final export)
+			RemoteParForUtils.incrementParForMRCounters(_report, 0, 0);
+			
+			//print heaver hitter per task
+			JobConf job = ConfigurationManager.getCachedJobConf();
+			if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
+				LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));		
+		}
+		catch(Exception ex)
+		{
+			throw new IOException( ex );
+		}
+		
+		//cleanup cache and local tmp dir
+		RemoteParForUtils.cleanupWorkingDirectories();
+		
+		//ensure caching is not disabled for CP in local mode
+		CacheableData.enableCaching();
+	}
+	
+	/**
+	 * Collects a matrixblock partition from a given input iterator over 
+	 * binary blocks.
+	 * 
+	 * Note it reuses the instance attribute _partition - multiple calls
+	 * will overwrite the result.
+	 * 
+	 * @param valueList
+	 * @return
+	 * @throws IOException 
+	 */
+	private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) 
+		throws IOException 
+	{
+		try
+		{
+			//reset reuse block, keep configured representation
+			_partition.reset(_rlen, _clen);	
+
+			while( valueList.hasNext() )
+			{
+				PairWritableBlock pairValue = (PairWritableBlock)valueList.next();
+				int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen;
+				int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
+				MatrixBlock block = pairValue.block;
+				if( !_partition.isInSparseFormat() ) //DENSE
+				{
+					_partition.copy( row_offset, row_offset+block.getNumRows()-1, 
+							   col_offset, col_offset+block.getNumColumns()-1,
+							   pairValue.block, false ); 
+				}
+				else //SPARSE 
+				{
+					_partition.appendToSparse(pairValue.block, row_offset, col_offset);
+				}
+			}
+
+			//final partition cleanup
+			cleanupCollectedMatrixPartition( _partition.isInSparseFormat() );
+		}
+		catch(DMLRuntimeException ex)
+		{
+			throw new IOException(ex);
+		}
+		
+		return _partition;
+	}
+	
+	
+	/**
+	 * Collects a matrixblock partition from a given input iterator over 
+	 * binary cells.
+	 * 
+	 * Note it reuses the instance attribute _partition - multiple calls
+	 * will overwrite the result.
+	 * 
+	 * @param valueList
+	 * @return
+	 * @throws IOException 
+	 */
+	private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList ) 
+		throws IOException 
+	{
+		//reset reuse block, keep configured representation
+		if( _tSparseCol )
+			_partition.reset(_clen, _rlen);	
+		else
+			_partition.reset(_rlen, _clen);
+		
+		switch( _dpf )
+		{
+			case ROW_WISE:
+				while( valueList.hasNext() )
+				{
+					PairWritableCell pairValue = (PairWritableCell)valueList.next();
+					if( pairValue.indexes.getColumnIndex()<0 )
+						continue; //cells used to ensure empty partitions
+					_partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
+				}
+				break;
+			case COLUMN_WISE:
+				while( valueList.hasNext() )
+				{
+					PairWritableCell pairValue = (PairWritableCell)valueList.next();
+					if( pairValue.indexes.getRowIndex()<0 )
+						continue; //cells used to ensure empty partitions
+					if( _tSparseCol )
+						_partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue());
+					else
+						_partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue());
+				}
+				break;
+			default: 
+				throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf);
+		}
+		
+		//final partition cleanup
+		cleanupCollectedMatrixPartition(_tSparseCol);
+		
+		return _partition;
+	}
+	
+	/**
+	 * 
+	 * @param sort
+	 * @throws IOException
+	 */
+	private void cleanupCollectedMatrixPartition(boolean sort) 
+		throws IOException
+	{
+		//sort sparse row contents if required
+		if( _partition.isInSparseFormat() && sort )
+			_partition.sortSparseRows();
+
+		//ensure right number of nnz
+		if( !_partition.isInSparseFormat() )
+			_partition.recomputeNonZeros();
+			
+		//exam and switch dense/sparse representation
+		try {
+			_partition.examSparsity();
+		}
+		catch(Exception ex){
+			throw new IOException(ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 2426bc2..67beff6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -1,266 +1,266 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in
- * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor)
- * 
- */
-public class RemoteParForUtils 
-{
-	
-	/**
-	 * 
-	 * @param reporter
-	 * @param deltaTasks
-	 * @param deltaIterations
-	 */
-	public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
-	{
-		//report parfor counters
-		if( deltaTasks>0 )
-			reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
-		if( deltaIterations>0 )
-			reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
-		
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		if( DMLScript.STATISTICS  && !InfrastructureAnalyzer.isLocalMode(job) ) 
-		{
-			//report cache statistics
-			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
-			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
-			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
-			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
-		
-			//reset cache statistics to prevent overlapping reporting
-			CacheStatistics.reset();
-		}
-	}
-	
-	/**
-	 * 
-	 * @param workerID
-	 * @param vars
-	 * @param resultVars
-	 * @param out
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out ) 
-			throws DMLRuntimeException, IOException
-	{
-		exportResultVariables(workerID, vars, resultVars, null, out);
-	}	
-	
-	/**
-	 * For remote MR parfor workers.
-	 * 
-	 * @param workerID
-	 * @param vars
-	 * @param resultVars
-	 * @param rvarFnames
-	 * @param out
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, 
-			                                  HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) 
-		throws DMLRuntimeException, IOException
-	{
-		//create key and value for reuse
-		LongWritable okey = new LongWritable( workerID ); 
-		Text ovalue = new Text();
-		
-		//foreach result variables probe if export necessary
-		for( String rvar : resultVars )
-		{
-			Data dat = vars.get( rvar );
-			
-			//export output variable to HDFS (see RunMRJobs)
-			if ( dat != null && dat.getDataType() == DataType.MATRIX ) 
-			{
-				MatrixObject mo = (MatrixObject) dat;
-				if( mo.isDirty() )
-				{
-					if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
-					{
-						String fname = rvarFnames.get( rvar );
-						if( fname!=null )
-							mo.setFileName( fname );
-							
-						//export result var (iff actually modified in parfor)
-						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)		
-						rvarFnames.put(rvar, mo.getFileName());	
-					}
-					else
-					{
-						//export result var (iff actually modified in parfor)
-						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
-					}
-					
-					//pass output vars (scalars by value, matrix by ref) to result
-					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
-					String datStr = ProgramConverter.serializeDataObject(rvar, mo);
-					ovalue.set( datStr );
-					out.collect( okey, ovalue );
-				}
-			}	
-		}
-	}
-	
-	/**
-	 * For remote Spark parfor workers. This is a simplified version compared to MR.
-	 * 
-	 * @param workerID
-	 * @param vars
-	 * @param resultVars
-	 * @param rvarFnames
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars) 
-		throws DMLRuntimeException, IOException
-	{
-		ArrayList<String> ret = new ArrayList<String>();
-		
-		//foreach result variables probe if export necessary
-		for( String rvar : resultVars )
-		{
-			Data dat = vars.get( rvar );
-			
-			//export output variable to HDFS (see RunMRJobs)
-			if ( dat != null && dat.getDataType() == DataType.MATRIX ) 
-			{
-				MatrixObject mo = (MatrixObject) dat;
-				if( mo.isDirty() )
-				{
-					//export result var (iff actually modified in parfor)
-					mo.exportData(); 
-					
-					
-					//pass output vars (scalars by value, matrix by ref) to result
-					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
-					ret.add( ProgramConverter.serializeDataObject(rvar, mo) );
-				}
-			}	
-		}
-		
-		return ret;
-	}
-		
-	
-	/**
-	 * Cleanup all temporary files created by this SystemML process
-	 * instance.
-	 * 
-	 */
-	public static void cleanupWorkingDirectories()
-	{
-		//use the given job configuration for infrastructure analysis (see configure);
-		//this is important for robustness w/ misconfigured classpath which also contains
-		//core-default.xml and hence hides the actual cluster configuration; otherwise
-		//there is missing cleanup of working directories 
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		
-		if( !InfrastructureAnalyzer.isLocalMode(job) )
-		{
-			//delete cache files
-			CacheableData.cleanupCacheDir();
-			//disable caching (prevent dynamic eviction)
-			CacheableData.disableCaching();
-			//cleanup working dir (e.g., of CP_FILE instructions)
-			LocalFileUtils.cleanupWorkingDirectory();
-		}
-	}
-	
-	/**
-	 * 
-	 * @param out
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) 
-		throws DMLRuntimeException
-	{
-		HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
-
-		int countAll = 0;
-		for( Tuple2<Long,String> entry : out )
-		{
-			Long key = entry._1();
-			String val = entry._2();
-			if( !tmp.containsKey( key ) )
-        		tmp.put(key, new LocalVariableMap ());	   
-			Object[] dat = ProgramConverter.parseDataObject( val );
-        	tmp.get(key).put((String)dat[0], (Data)dat[1]);
-        	countAll++;
-		}
-
-		if( LOG != null ) {
-			LOG.debug("Num remote worker results (before deduplication): "+countAll);
-			LOG.debug("Num remote worker results: "+tmp.size());
-		}
-		
-		//create return array
-		return tmp.values().toArray(new LocalVariableMap[0]);	
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import scala.Tuple2;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in
+ * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor)
+ * 
+ */
+public class RemoteParForUtils 
+{
+	
+	/**
+	 * 
+	 * @param reporter
+	 * @param deltaTasks
+	 * @param deltaIterations
+	 */
+	public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
+	{
+		//report parfor counters
+		if( deltaTasks>0 )
+			reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
+		if( deltaIterations>0 )
+			reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
+		
+		JobConf job = ConfigurationManager.getCachedJobConf();
+		if( DMLScript.STATISTICS  && !InfrastructureAnalyzer.isLocalMode(job) ) 
+		{
+			//report cache statistics
+			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
+			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
+			reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
+			reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
+		
+			//reset cache statistics to prevent overlapping reporting
+			CacheStatistics.reset();
+		}
+	}
+	
+	/**
+	 * 
+	 * @param workerID
+	 * @param vars
+	 * @param resultVars
+	 * @param out
+	 * @throws DMLRuntimeException
+	 * @throws IOException
+	 */
+	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out ) 
+			throws DMLRuntimeException, IOException
+	{
+		exportResultVariables(workerID, vars, resultVars, null, out);
+	}	
+	
+	/**
+	 * For remote MR parfor workers.
+	 * 
+	 * @param workerID
+	 * @param vars
+	 * @param resultVars
+	 * @param rvarFnames
+	 * @param out
+	 * @throws DMLRuntimeException
+	 * @throws IOException
+	 */
+	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, 
+			                                  HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) 
+		throws DMLRuntimeException, IOException
+	{
+		//create key and value for reuse
+		LongWritable okey = new LongWritable( workerID ); 
+		Text ovalue = new Text();
+		
+		//foreach result variables probe if export necessary
+		for( String rvar : resultVars )
+		{
+			Data dat = vars.get( rvar );
+			
+			//export output variable to HDFS (see RunMRJobs)
+			if ( dat != null && dat.getDataType() == DataType.MATRIX ) 
+			{
+				MatrixObject mo = (MatrixObject) dat;
+				if( mo.isDirty() )
+				{
+					if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
+					{
+						String fname = rvarFnames.get( rvar );
+						if( fname!=null )
+							mo.setFileName( fname );
+							
+						//export result var (iff actually modified in parfor)
+						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)		
+						rvarFnames.put(rvar, mo.getFileName());	
+					}
+					else
+					{
+						//export result var (iff actually modified in parfor)
+						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
+					}
+					
+					//pass output vars (scalars by value, matrix by ref) to result
+					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
+					String datStr = ProgramConverter.serializeDataObject(rvar, mo);
+					ovalue.set( datStr );
+					out.collect( okey, ovalue );
+				}
+			}	
+		}
+	}
+	
+	/**
+	 * For remote Spark parfor workers. This is a simplified version compared to MR.
+	 * 
+	 * @param workerID
+	 * @param vars
+	 * @param resultVars
+	 * @param rvarFnames
+	 * @throws DMLRuntimeException
+	 * @throws IOException
+	 */
+	public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars) 
+		throws DMLRuntimeException, IOException
+	{
+		ArrayList<String> ret = new ArrayList<String>();
+		
+		//foreach result variables probe if export necessary
+		for( String rvar : resultVars )
+		{
+			Data dat = vars.get( rvar );
+			
+			//export output variable to HDFS (see RunMRJobs)
+			if ( dat != null && dat.getDataType() == DataType.MATRIX ) 
+			{
+				MatrixObject mo = (MatrixObject) dat;
+				if( mo.isDirty() )
+				{
+					//export result var (iff actually modified in parfor)
+					mo.exportData(); 
+					
+					
+					//pass output vars (scalars by value, matrix by ref) to result
+					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
+					ret.add( ProgramConverter.serializeDataObject(rvar, mo) );
+				}
+			}	
+		}
+		
+		return ret;
+	}
+		
+	
+	/**
+	 * Cleanup all temporary files created by this SystemML process
+	 * instance.
+	 * 
+	 */
+	public static void cleanupWorkingDirectories()
+	{
+		//use the given job configuration for infrastructure analysis (see configure);
+		//this is important for robustness w/ misconfigured classpath which also contains
+		//core-default.xml and hence hides the actual cluster configuration; otherwise
+		//there is missing cleanup of working directories 
+		JobConf job = ConfigurationManager.getCachedJobConf();
+		
+		if( !InfrastructureAnalyzer.isLocalMode(job) )
+		{
+			//delete cache files
+			CacheableData.cleanupCacheDir();
+			//disable caching (prevent dynamic eviction)
+			CacheableData.disableCaching();
+			//cleanup working dir (e.g., of CP_FILE instructions)
+			LocalFileUtils.cleanupWorkingDirectory();
+		}
+	}
+	
+	/**
+	 * 
+	 * @param out
+	 * @return
+	 * @throws DMLRuntimeException
+	 * @throws IOException
+	 */
+	public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) 
+		throws DMLRuntimeException
+	{
+		HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
+
+		int countAll = 0;
+		for( Tuple2<Long,String> entry : out )
+		{
+			Long key = entry._1();
+			String val = entry._2();
+			if( !tmp.containsKey( key ) )
+        		tmp.put(key, new LocalVariableMap ());	   
+			Object[] dat = ProgramConverter.parseDataObject( val );
+        	tmp.get(key).put((String)dat[0], (Data)dat[1]);
+        	countAll++;
+		}
+
+		if( LOG != null ) {
+			LOG.debug("Num remote worker results (before deduplication): "+countAll);
+			LOG.debug("Num remote worker results: "+tmp.size());
+		}
+		
+		//create return array
+		return tmp.values().toArray(new LocalVariableMap[0]);	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
index 6f478bc..e6556fa 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
@@ -1,58 +1,58 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-#PerfTestTool: DML template for estimation cost functions.
-
-dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n) 
-return (Matrix[Double] D) 
-implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem") 
-
-dynWrite = externalFunction(Matrix[Double] R, String fname) 
-return (Matrix[Double] D) 
-implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem") 
-
-solve = externalFunction(Matrix[Double] A, Matrix[Double] y) 
-return (Matrix[Double] b) 
-implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem") 
-
-k = %numModels%;
-m = -1; 
-n = -1;
-
-dummy = matrix(1,rows=1,cols=1); 
-
-for( i in 1:k, par=8, mode=LOCAL )
-{
-   sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";   
-   sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";   
-   
-   D = dynRead( dummy, sin1, m, n );
-   y = dynRead( dummy, sin2, m, 1 );
-   
-   A = t(D) %*% D; # X'X
-   b = t(D) %*% y; # X'y
-   beta = solve(A,b); 
-
-   sout = "./conf/PerfTestTool/"+i+"_out.csv";   
-   
-   X=dynWrite( beta, sout );
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+#PerfTestTool: DML template for estimation cost functions.
+
+dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n) 
+return (Matrix[Double] D) 
+implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem") 
+
+dynWrite = externalFunction(Matrix[Double] R, String fname) 
+return (Matrix[Double] D) 
+implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem") 
+
+solve = externalFunction(Matrix[Double] A, Matrix[Double] y) 
+return (Matrix[Double] b) 
+implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem") 
+
+k = %numModels%;
+m = -1; 
+n = -1;
+
+dummy = matrix(1,rows=1,cols=1); 
+
+for( i in 1:k, par=8, mode=LOCAL )
+{
+   sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";   
+   sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";   
+   
+   D = dynRead( dummy, sin1, m, n );
+   y = dynRead( dummy, sin2, m, 1 );
+   
+   A = t(D) %*% D; # X'X
+   b = t(D) %*% y; # X'y
+   beta = solve(A,b); 
+
+   sout = "./conf/PerfTestTool/"+i+"_out.csv";   
+   
+   X=dynWrite( beta, sout );
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 1dd419f..13c68e2 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -1,64 +1,64 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.lang.ref.SoftReference;
-
-import org.apache.spark.broadcast.Broadcast;
-
-public class BroadcastObject extends LineageObject
-{
-	//soft reference storage for graceful cleanup in case of memory pressure
-	private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
-	
-	public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
-	{
-		_bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
-		_varName = varName;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public PartitionedBroadcastMatrix getBroadcast()
-	{
-		return _bcHandle.get();
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public boolean isValid() 
-	{
-		//check for evicted soft reference
-		PartitionedBroadcastMatrix pbm = _bcHandle.get();
-		if( pbm == null )
-			return false;
-		
-		//check for validity of individual broadcasts
-		Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
-		for( Broadcast<PartitionedMatrixBlock> bc : tmp )
-			if( !bc.isValid() )
-				return false;		
-		return true;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.lang.ref.SoftReference;
+
+import org.apache.spark.broadcast.Broadcast;
+
+public class BroadcastObject extends LineageObject
+{
+	//soft reference storage for graceful cleanup in case of memory pressure
+	private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
+	
+	public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
+	{
+		_bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
+		_varName = varName;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public PartitionedBroadcastMatrix getBroadcast()
+	{
+		return _bcHandle.get();
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean isValid() 
+	{
+		//check for evicted soft reference
+		PartitionedBroadcastMatrix pbm = _bcHandle.get();
+		if( pbm == null )
+			return false;
+		
+		//check for validity of individual broadcasts
+		Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
+		for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+			if( !bc.isValid() )
+				return false;		
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
index b2bb62c..bcf37bb 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
@@ -1,83 +1,83 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-
-public abstract class LineageObject 
-{
-
-	//basic lineage information
-	protected int _numRef = -1;
-	protected List<LineageObject> _childs = null;
-	protected String _varName = null;
-	
-	//N:1 back reference to matrix object
-	protected MatrixObject _mo = null;
-	
-	protected LineageObject()
-	{
-		_numRef = 0;
-		_childs = new ArrayList<LineageObject>();
-	}
-	
-	public String getVarName() {
-		return _varName;
-	}
-	
-	public int getNumReferences()
-	{
-		return _numRef;
-	}
-	
-	public void setBackReference(MatrixObject mo)
-	{
-		_mo = mo;
-	}
-	
-	public boolean hasBackReference()
-	{
-		return (_mo != null);
-	}
-	
-	public void incrementNumReferences()
-	{
-		_numRef++;
-	}
-	
-	public void decrementNumReferences()
-	{
-		_numRef--;
-	}
-	
-	public List<LineageObject> getLineageChilds()
-	{
-		return _childs;
-	}
-	
-	public void addLineageChild(LineageObject lob)
-	{
-		lob.incrementNumReferences();
-		_childs.add( lob );
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+
+public abstract class LineageObject 
+{
+
+	//basic lineage information
+	protected int _numRef = -1;
+	protected List<LineageObject> _childs = null;
+	protected String _varName = null;
+	
+	//N:1 back reference to matrix object
+	protected MatrixObject _mo = null;
+	
+	protected LineageObject()
+	{
+		_numRef = 0;
+		_childs = new ArrayList<LineageObject>();
+	}
+	
+	public String getVarName() {
+		return _varName;
+	}
+	
+	public int getNumReferences()
+	{
+		return _numRef;
+	}
+	
+	public void setBackReference(MatrixObject mo)
+	{
+		_mo = mo;
+	}
+	
+	public boolean hasBackReference()
+	{
+		return (_mo != null);
+	}
+	
+	public void incrementNumReferences()
+	{
+		_numRef++;
+	}
+	
+	public void decrementNumReferences()
+	{
+		_numRef--;
+	}
+	
+	public List<LineageObject> getLineageChilds()
+	{
+		return _childs;
+	}
+	
+	public void addLineageChild(LineageObject lob)
+	{
+		lob.incrementNumReferences();
+		_childs.add( lob );
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index fb7e773..605e7ca 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import org.apache.spark.api.java.JavaPairRDD;
-
-public class RDDObject extends LineageObject
-{
-
-	private JavaPairRDD<?,?> _rddHandle = null;
-	
-	//meta data on origin of given rdd handle
-	private boolean _checkpointed = false; //created via checkpoint instruction
-	private boolean _hdfsfile = false;     //created from hdfs file
-	private String  _hdfsFname = null;     //hdfs filename, if created from hdfs.  
-	
-	public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
-	{
-		_rddHandle = rddvar;
-		_varName = varName;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public JavaPairRDD<?,?> getRDD()
-	{
-		return _rddHandle;
-	}
-	
-	public void setCheckpointRDD( boolean flag )
-	{
-		_checkpointed = flag;
-	}
-	
-	public boolean isCheckpointRDD() 
-	{
-		return _checkpointed;
-	}
-	
-	public void setHDFSFile( boolean flag ) {
-		_hdfsfile = flag;
-	}
-	
-	public void setHDFSFilename( String fname ) {
-		_hdfsFname = fname;
-	}
-	
-	public boolean isHDFSFile() {
-		return _hdfsfile;
-	}
-	
-	public String getHDFSFilename() {
-		return _hdfsFname;
-	}
-	
-
-	/**
-	 * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
-	 * in both cases, we can directly read the file instead of collecting
-	 * the given rdd.
-	 * 
-	 * @return
-	 */
-	public boolean allowsShortCircuitRead()
-	{
-		boolean ret = isHDFSFile();
-		
-		if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
-			LineageObject lo = getLineageChilds().get(0);
-			ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() );
-		}
-		
-		return ret;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public boolean allowsShortCircuitCollect()
-	{
-		return ( isCheckpointRDD() && getLineageChilds().size() == 1
-			     && getLineageChilds().get(0) instanceof RDDObject );
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public boolean rHasCheckpointRDDChilds()
-	{
-		//probe for checkpoint rdd
-		if( _checkpointed )
-			return true;
-		
-		//process childs recursively
-		boolean ret = false;
-		for( LineageObject lo : getLineageChilds() ) {
-			if( lo instanceof RDDObject )
-				ret |= ((RDDObject)lo).rHasCheckpointRDDChilds();
-		}
-		
-		return ret;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class RDDObject extends LineageObject
+{
+
+	private JavaPairRDD<?,?> _rddHandle = null;
+	
+	//meta data on origin of given rdd handle
+	private boolean _checkpointed = false; //created via checkpoint instruction
+	private boolean _hdfsfile = false;     //created from hdfs file
+	private String  _hdfsFname = null;     //hdfs filename, if created from hdfs.  
+	
+	public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
+	{
+		_rddHandle = rddvar;
+		_varName = varName;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public JavaPairRDD<?,?> getRDD()
+	{
+		return _rddHandle;
+	}
+	
+	public void setCheckpointRDD( boolean flag )
+	{
+		_checkpointed = flag;
+	}
+	
+	public boolean isCheckpointRDD() 
+	{
+		return _checkpointed;
+	}
+	
+	public void setHDFSFile( boolean flag ) {
+		_hdfsfile = flag;
+	}
+	
+	public void setHDFSFilename( String fname ) {
+		_hdfsFname = fname;
+	}
+	
+	public boolean isHDFSFile() {
+		return _hdfsfile;
+	}
+	
+	public String getHDFSFilename() {
+		return _hdfsFname;
+	}
+	
+
+	/**
+	 * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
+	 * in both cases, we can directly read the file instead of collecting
+	 * the given rdd.
+	 * 
+	 * @return
+	 */
+	public boolean allowsShortCircuitRead()
+	{
+		boolean ret = isHDFSFile();
+		
+		if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
+			LineageObject lo = getLineageChilds().get(0);
+			ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() );
+		}
+		
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean allowsShortCircuitCollect()
+	{
+		return ( isCheckpointRDD() && getLineageChilds().size() == 1
+			     && getLineageChilds().get(0) instanceof RDDObject );
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean rHasCheckpointRDDChilds()
+	{
+		//probe for checkpoint rdd
+		if( _checkpointed )
+			return true;
+		
+		//process childs recursively
+		boolean ret = false;
+		for( LineageObject lo : getLineageChilds() ) {
+			if( lo instanceof RDDObject )
+				ret |= ((RDDObject)lo).rHasCheckpointRDDChilds();
+		}
+		
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
index e561f3c..6cb0830 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
@@ -1,167 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.mapred;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.sysml.runtime.functionobjects.CM;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
-import org.apache.sysml.runtime.instructions.cp.KahanObject;
-import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
-import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.WeightedCell;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysml.runtime.matrix.operators.CMOperator;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-
-
-public class GroupedAggMRCombiner extends ReduceBase
-	implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell>
-{	
-	//grouped aggregate instructions
-	private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>();
-	
-	//reused intermediate objects
-	private CM_COV_Object cmObj = new CM_COV_Object(); 
-	private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
-	private WeightedCell outCell = new WeightedCell();
-
-	@Override
-	public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values,
-			           OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter)
-		throws IOException 
-	{
-		long start = System.currentTimeMillis();
-		
-		//get aggregate operator
-		GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag());
-		Operator op = ins.getOperator();
-		boolean isPartialAgg = true;
-		
-		//combine iterator to single value
-		try
-		{
-			if(op instanceof CMOperator) //everything except sum
-			{
-				if( ((CMOperator) op).isPartialAggregateOperator() )
-				{
-					cmObj.reset();
-					CM lcmFn = cmFn.get(key.getTag());
-					
-					//partial aggregate cm operator 
-					while( values.hasNext() )
-					{
-						WeightedCell value=values.next();
-						lcmFn.execute(cmObj, value.getValue(), value.getWeight());				
-					}
-					
-					outCell.setValue(cmObj.getRequiredPartialResult(op));
-					outCell.setWeight(cmObj.getWeight());	
-				}
-				else //forward tuples to reducer
-				{
-					isPartialAgg = false; 
-					while( values.hasNext() )
-						out.collect(key, values.next());
-				}				
-			}
-			else if(op instanceof AggregateOperator) //sum
-			{
-				AggregateOperator aggop=(AggregateOperator) op;
-					
-				if( aggop.correctionExists )
-				{
-					KahanObject buffer=new KahanObject(aggop.initialValue, 0);
-					
-					KahanPlus.getKahanPlusFnObject();
-					
-					//partial aggregate with correction
-					while( values.hasNext() )
-					{
-						WeightedCell value=values.next();
-						aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
-					}
-					
-					outCell.setValue(buffer._sum);
-					outCell.setWeight(1);
-				}
-				else //no correction
-				{
-					double v = aggop.initialValue;
-					
-					//partial aggregate without correction
-					while(values.hasNext())
-					{
-						WeightedCell value=values.next();
-						v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight());
-					}
-					
-					outCell.setValue(v);
-					outCell.setWeight(1);
-				}				
-			}
-			else
-				throw new IOException("Unsupported operator in instruction: " + ins);
-		}
-		catch(Exception ex)
-		{
-			throw new IOException(ex);
-		}
-		
-		//collect the output (to reducer)
-		if( isPartialAgg )
-			out.collect(key, outCell);
-		
-		reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start);
-	}
-
-	@Override
-	public void configure(JobConf job)
-	{
-		try 
-		{
-			GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job);
-			if( grpaggIns != null )	
-				for(GroupedAggregateInstruction ins : grpaggIns)
-				{
-					grpaggInstructions.put(ins.output, ins);	
-					if( ins.getOperator() instanceof CMOperator )
-						cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
-				}
-		} 
-		catch (Exception e) 
-		{
-			throw new RuntimeException(e);
-		} 
-	}
-	
-	@Override
-	public void close()
-	{
-		//do nothing, overrides unnecessary handling in superclass
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
+import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.CMOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+
+
+public class GroupedAggMRCombiner extends ReduceBase
+	implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell>
+{	
+	//grouped aggregate instructions
+	private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>();
+	
+	//reused intermediate objects
+	private CM_COV_Object cmObj = new CM_COV_Object(); 
+	private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
+	private WeightedCell outCell = new WeightedCell();
+
+	@Override
+	public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values,
+			           OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter)
+		throws IOException 
+	{
+		long start = System.currentTimeMillis();
+		
+		//get aggregate operator
+		GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag());
+		Operator op = ins.getOperator();
+		boolean isPartialAgg = true;
+		
+		//combine iterator to single value
+		try
+		{
+			if(op instanceof CMOperator) //everything except sum
+			{
+				if( ((CMOperator) op).isPartialAggregateOperator() )
+				{
+					cmObj.reset();
+					CM lcmFn = cmFn.get(key.getTag());
+					
+					//partial aggregate cm operator 
+					while( values.hasNext() )
+					{
+						WeightedCell value=values.next();
+						lcmFn.execute(cmObj, value.getValue(), value.getWeight());				
+					}
+					
+					outCell.setValue(cmObj.getRequiredPartialResult(op));
+					outCell.setWeight(cmObj.getWeight());	
+				}
+				else //forward tuples to reducer
+				{
+					isPartialAgg = false; 
+					while( values.hasNext() )
+						out.collect(key, values.next());
+				}				
+			}
+			else if(op instanceof AggregateOperator) //sum
+			{
+				AggregateOperator aggop=(AggregateOperator) op;
+					
+				if( aggop.correctionExists )
+				{
+					KahanObject buffer=new KahanObject(aggop.initialValue, 0);
+					
+					KahanPlus.getKahanPlusFnObject();
+					
+					//partial aggregate with correction
+					while( values.hasNext() )
+					{
+						WeightedCell value=values.next();
+						aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
+					}
+					
+					outCell.setValue(buffer._sum);
+					outCell.setWeight(1);
+				}
+				else //no correction
+				{
+					double v = aggop.initialValue;
+					
+					//partial aggregate without correction
+					while(values.hasNext())
+					{
+						WeightedCell value=values.next();
+						v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight());
+					}
+					
+					outCell.setValue(v);
+					outCell.setWeight(1);
+				}				
+			}
+			else
+				throw new IOException("Unsupported operator in instruction: " + ins);
+		}
+		catch(Exception ex)
+		{
+			throw new IOException(ex);
+		}
+		
+		//collect the output (to reducer)
+		if( isPartialAgg )
+			out.collect(key, outCell);
+		
+		reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start);
+	}
+
+	@Override
+	public void configure(JobConf job)
+	{
+		try 
+		{
+			GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job);
+			if( grpaggIns != null )	
+				for(GroupedAggregateInstruction ins : grpaggIns)
+				{
+					grpaggInstructions.put(ins.output, ins);	
+					if( ins.getOperator() instanceof CMOperator )
+						cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
+				}
+		} 
+		catch (Exception e) 
+		{
+			throw new RuntimeException(e);
+		} 
+	}
+	
+	@Override
+	public void close()
+	{
+		//do nothing, overrides unnecessary handling in superclass
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
index a08631d..fa9843a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
@@ -1,84 +1,84 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class IndexSortComparable implements WritableComparable
-{
-	
-	protected DoubleWritable _dval = null;
-	protected LongWritable _lval = null; 
-	
-	public IndexSortComparable()
-	{
-		_dval = new DoubleWritable();
-		_lval = new LongWritable();
-	}
-	
-	public void set(double dval, long lval)
-	{
-		_dval.set(dval);
-		_lval.set(lval);
-	}
-
-	@Override
-	public void readFields(DataInput arg0)
-		throws IOException 
-	{
-		_dval.readFields(arg0);
-		_lval.readFields(arg0);
-	}
-
-	@Override
-	public void write(DataOutput arg0) 
-		throws IOException 
-	{
-		_dval.write(arg0);
-		_lval.write(arg0);
-	}
-
-	@Override
-	public int compareTo(Object o) 
-	{
-		//compare only double value (e.g., for partitioner)
-		if( o instanceof DoubleWritable ) {
-			return _dval.compareTo((DoubleWritable) o);
-		}
-		//compare double value and index (e.g., for stable sort)
-		else if( o instanceof IndexSortComparable) {
-			IndexSortComparable that = (IndexSortComparable)o;
-			int tmp = _dval.compareTo(that._dval);
-			if( tmp==0 ) //secondary sort
-				tmp = _lval.compareTo(that._lval);
-			return tmp;
-		}	
-		else {
-			throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public class IndexSortComparable implements WritableComparable
+{
+	
+	protected DoubleWritable _dval = null;
+	protected LongWritable _lval = null; 
+	
+	public IndexSortComparable()
+	{
+		_dval = new DoubleWritable();
+		_lval = new LongWritable();
+	}
+	
+	public void set(double dval, long lval)
+	{
+		_dval.set(dval);
+		_lval.set(lval);
+	}
+
+	@Override
+	public void readFields(DataInput arg0)
+		throws IOException 
+	{
+		_dval.readFields(arg0);
+		_lval.readFields(arg0);
+	}
+
+	@Override
+	public void write(DataOutput arg0) 
+		throws IOException 
+	{
+		_dval.write(arg0);
+		_lval.write(arg0);
+	}
+
+	@Override
+	public int compareTo(Object o) 
+	{
+		//compare only double value (e.g., for partitioner)
+		if( o instanceof DoubleWritable ) {
+			return _dval.compareTo((DoubleWritable) o);
+		}
+		//compare double value and index (e.g., for stable sort)
+		else if( o instanceof IndexSortComparable) {
+			IndexSortComparable that = (IndexSortComparable)o;
+			int tmp = _dval.compareTo(that._dval);
+			if( tmp==0 ) //secondary sort
+				tmp = _lval.compareTo(that._lval);
+			return tmp;
+		}	
+		else {
+			throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
+		}
+	}
+}