You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/05/26 06:52:13 UTC

[2/4] incubator-systemml git commit: [SYSTEMML-1303] Remove deprecated old MLContext API

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 1dd3600..06a2005 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -84,8 +84,8 @@ public class SparkExecutionContext extends ExecutionContext
 {
 	private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName());
 	private static final boolean LDEBUG = false; //local debug flag
-	
-	//internal configurations 
+
+	//internal configurations
 	private static boolean LAZY_SPARKCTX_CREATION = true;
 	private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
 
@@ -93,16 +93,16 @@ public class SparkExecutionContext extends ExecutionContext
 
 	//executor memory and relative fractions as obtained from the spark configuration
 	private static SparkClusterConfig _sconf = null;
-	
-	//singleton spark context (as there can be only one spark context per JVM) 
-	private static JavaSparkContext _spctx = null; 
-	
-	//registry of parallelized RDDs to enforce that at any time, we spent at most 
+
+	//singleton spark context (as there can be only one spark context per JVM)
+	private static JavaSparkContext _spctx = null;
+
+	//registry of parallelized RDDs to enforce that at any time, we spent at most
 	//10% of JVM max heap size for parallelized RDDs; if this is not sufficient,
 	//matrices or frames are exported to HDFS and the RDDs are created from files.
 	//TODO unify memory management for CP, par RDDs, and potentially broadcasts
 	private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1);
-	
+
 	static {
 		// for internal debugging only
 		if( LDEBUG ) {
@@ -111,31 +111,31 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 	}
 
-	protected SparkExecutionContext(boolean allocateVars, Program prog) 
+	protected SparkExecutionContext(boolean allocateVars, Program prog)
 	{
 		//protected constructor to force use of ExecutionContextFactory
 		super( allocateVars, prog );
-				
+
 		//spark context creation via internal initializer
 		if( !(LAZY_SPARKCTX_CREATION && OptimizerUtils.isHybridExecutionMode()) ) {
 			initSparkContext();
 		}
 	}
-		
+
 	/**
 	 * Returns the used singleton spark context. In case of lazy spark context
 	 * creation, this methods blocks until the spark context is created.
-	 *  
+	 *
 	 * @return java spark context
 	 */
 	public JavaSparkContext getSparkContext()
 	{
-		//lazy spark context creation on demand (lazy instead of asynchronous 
+		//lazy spark context creation on demand (lazy instead of asynchronous
 		//to avoid wait for uninitialized spark context on close)
 		if( LAZY_SPARKCTX_CREATION ) {
 			initSparkContext();
 		}
-		
+
 		//return the created spark context
 		return _spctx;
 	}
@@ -144,11 +144,11 @@ public class SparkExecutionContext extends ExecutionContext
 		initSparkContext();
 		return _spctx;
 	}
-	
+
 	/**
 	 * Indicates if the spark context has been created or has
 	 * been passed in from outside.
-	 * 
+	 *
 	 * @return true if spark context created
 	 */
 	public synchronized static boolean isSparkContextCreated() {
@@ -159,26 +159,25 @@ public class SparkExecutionContext extends ExecutionContext
 		_spctx = null;
 	}
 
-	public void close() 
+	public void close()
 	{
 		synchronized( SparkExecutionContext.class ) {
-			if( _spctx != null ) 
+			if( _spctx != null )
 			{
 				//stop the spark context if existing
 				_spctx.stop();
-				
+
 				//make sure stopped context is never used again
-				_spctx = null; 
+				_spctx = null;
 			}
-				
+
 		}
 	}
-	
+
 	public static boolean isLazySparkContextCreation(){
 		return LAZY_SPARKCTX_CREATION;
 	}
 
-	@SuppressWarnings("deprecation")
 	private synchronized static void initSparkContext()
 	{
 		//check for redundant spark context init
@@ -186,24 +185,19 @@ public class SparkExecutionContext extends ExecutionContext
 			return;
 
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
+
 		//create a default spark context (master, appname, etc refer to system properties
 		//as given in the spark configuration or during spark-submit)
-		
+
 		Object mlCtxObj = MLContextProxy.getActiveMLContext();
-		if(mlCtxObj != null) 
+		if(mlCtxObj != null)
 		{
 			// This is when DML is called through spark shell
 			// Will clean the passing of static variables later as this involves minimal change to DMLScript
-			if (mlCtxObj instanceof org.apache.sysml.api.MLContext) {
-				org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlCtxObj;
-				_spctx = new JavaSparkContext(mlCtx.getSparkContext());
-			} else if (mlCtxObj instanceof org.apache.sysml.api.mlcontext.MLContext) {
-				org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
-				_spctx = MLContextUtil.getJavaSparkContext(mlCtx);
-			}
+			org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
+			_spctx = MLContextUtil.getJavaSparkContext(mlCtx);
 		}
-		else 
+		else
 		{
 			if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
 				// For now set 4 cores for integration testing :)
@@ -220,128 +214,128 @@ public class SparkExecutionContext extends ExecutionContext
 				SparkConf conf = createSystemMLSparkConf();
 				_spctx = new JavaSparkContext(conf);
 			}
-			
+
 			_parRDDs.clear();
 		}
-		
-		// Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect 
+
+		// Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect
 		String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g");
-		long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize); 
+		long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize);
 		if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG)
 			LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "."
-					+ " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " 
+					+ " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size "
 					+ UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + ".");
-		
+
 		//globally add binaryblock serialization framework for all hdfs read/write operations
-		//TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end 
+		//TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end
 		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
 			MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() );
-		
+
 		//statistics maintenance
 		if( DMLScript.STATISTICS ){
 			Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
 		}
-	}	
-	
+	}
+
 	/**
 	 * Sets up a SystemML-preferred Spark configuration based on the implicit
 	 * default configuration (as passed via configurations from outside).
-	 * 
+	 *
 	 * @return spark configuration
 	 */
 	public static SparkConf createSystemMLSparkConf() {
 		SparkConf conf = new SparkConf();
-		
+
 		//always set unlimited result size (required for cp collect)
 		conf.set("spark.driver.maxResultSize", "0");
-		
+
 		//always use the fair scheduler (for single jobs, it's equivalent to fifo
 		//but for concurrent jobs in parfor it ensures better data locality because
 		//round robin assignment mitigates the problem of 'sticky slots')
 		if( FAIR_SCHEDULER_MODE ) {
 			conf.set("spark.scheduler.mode", "FAIR");
 		}
-		
+
 		//increase scheduler delay (usually more robust due to better data locality)
 		if( !conf.contains("spark.locality.wait") ) { //default 3s
 			conf.set("spark.locality.wait", "5s");
 		}
-		
+
 		return conf;
 	}
 
 	/**
 	 * Spark instructions should call this for all matrix inputs except broadcast
 	 * variables.
-	 * 
+	 *
 	 * @param varname variable name
 	 * @return JavaPairRDD of MatrixIndexes-MatrixBlocks
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) 
-		throws DMLRuntimeException 
+	public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname )
+		throws DMLRuntimeException
 	{
 		return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
 	}
-	
+
 	/**
 	 * Spark instructions should call this for all frame inputs except broadcast
 	 * variables.
-	 * 
+	 *
 	 * @param varname variable name
 	 * @return JavaPairRDD of Longs-FrameBlocks
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) 
-		throws DMLRuntimeException 
+	public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname )
+		throws DMLRuntimeException
 	{
 		JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
 		return out;
 	}
 
-	public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) 
+	public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo )
 		throws DMLRuntimeException
 	{
 		Data dat = getVariable(varname);
 		if( dat instanceof MatrixObject ) {
 			MatrixObject mo = getMatrixObject(varname);
-			return getRDDHandleForMatrixObject(mo, inputInfo);	
+			return getRDDHandleForMatrixObject(mo, inputInfo);
 		}
 		else if( dat instanceof FrameObject ) {
 			FrameObject fo = getFrameObject(varname);
-			return getRDDHandleForFrameObject(fo, inputInfo);	
+			return getRDDHandleForFrameObject(fo, inputInfo);
 		}
 		else {
 			throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame.");
 		}
 	}
-	
+
 	/**
-	 * This call returns an RDD handle for a given matrix object. This includes 
-	 * the creation of RDDs for in-memory or binary-block HDFS data. 
-	 * 
+	 * This call returns an RDD handle for a given matrix object. This includes
+	 * the creation of RDDs for in-memory or binary-block HDFS data.
+	 *
 	 * @param mo matrix object
 	 * @param inputInfo input info
 	 * @return JavaPairRDD handle for a matrix object
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) 
+	public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo )
 		throws DMLRuntimeException
-	{		
+	{
 		//NOTE: MB this logic should be integrated into MatrixObject
-		//However, for now we cannot assume that spark libraries are 
-		//always available and hence only store generic references in 
+		//However, for now we cannot assume that spark libraries are
+		//always available and hence only store generic references in
 		//matrix object while all the logic is in the SparkExecContext
-		
+
 		JavaSparkContext sc = getSparkContext();
 		JavaPairRDD<?,?> rdd = null;
 		//CASE 1: rdd already existing (reuse if checkpoint or trigger
-		//pending rdd operations if not yet cached but prevent to re-evaluate 
+		//pending rdd operations if not yet cached but prevent to re-evaluate
 		//rdd operations if already executed and cached
-		if(    mo.getRDDHandle()!=null 
+		if(    mo.getRDDHandle()!=null
 			&& (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) )
 		{
 			//return existing rdd handling (w/o input format change)
@@ -359,7 +353,7 @@ public class SparkExecutionContext extends ExecutionContext
 				if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary
 					mo.exportData();
 				rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-				rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug			
+				rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug
 				fromFile = true;
 			}
 			else { //default case
@@ -368,7 +362,7 @@ public class SparkExecutionContext extends ExecutionContext
 				mo.release(); //unpin matrix
 				_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
 			}
-			
+
 			//keep rdd handle for future operations on it
 			RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
 			rddhandle.setHDFSFile(fromFile);
@@ -396,43 +390,43 @@ public class SparkExecutionContext extends ExecutionContext
 			else {
 				throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
 			}
-			
+
 			//keep rdd handle for future operations on it
 			RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
 			rddhandle.setHDFSFile(true);
 			mo.setRDDHandle(rddhandle);
 		}
-		
+
 		return rdd;
 	}
-	
+
 	/**
 	 * FIXME: currently this implementation assumes matrix representations but frame signature
 	 * in order to support the old transform implementation.
-	 * 
+	 *
 	 * @param fo frame object
 	 * @param inputInfo input info
 	 * @return JavaPairRDD handle for a frame object
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo ) 
+	public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo )
 		throws DMLRuntimeException
-	{	
+	{
 		//NOTE: MB this logic should be integrated into FrameObject
-		//However, for now we cannot assume that spark libraries are 
-		//always available and hence only store generic references in 
+		//However, for now we cannot assume that spark libraries are
+		//always available and hence only store generic references in
 		//matrix object while all the logic is in the SparkExecContext
-		
-		InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ? 
+
+		InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ?
 				InputInfo.BinaryBlockFrameInputInfo : inputInfo;
-		
+
 		JavaSparkContext sc = getSparkContext();
 		JavaPairRDD<?,?> rdd = null;
 		//CASE 1: rdd already existing (reuse if checkpoint or trigger
-		//pending rdd operations if not yet cached but prevent to re-evaluate 
+		//pending rdd operations if not yet cached but prevent to re-evaluate
 		//rdd operations if already executed and cached
-		if(    fo.getRDDHandle()!=null 
+		if(    fo.getRDDHandle()!=null
 			&& (fo.getRDDHandle().isCheckpointRDD() || !fo.isCached(false)) )
 		{
 			//return existing rdd handling (w/o input format change)
@@ -451,7 +445,7 @@ public class SparkExecutionContext extends ExecutionContext
 					fo.exportData();
 				}
 				rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass);
-				rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug			
+				rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug
 				fromFile = true;
 			}
 			else { //default case
@@ -460,7 +454,7 @@ public class SparkExecutionContext extends ExecutionContext
 				fo.release(); //unpin frame
 				_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
 			}
-			
+
 			//keep rdd handle for future operations on it
 			RDDObject rddhandle = new RDDObject(rdd, fo.getVarName());
 			rddhandle.setHDFSFile(fromFile);
@@ -487,64 +481,64 @@ public class SparkExecutionContext extends ExecutionContext
 			else {
 				throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
 			}
-			
+
 			//keep rdd handle for future operations on it
 			RDDObject rddhandle = new RDDObject(rdd, fo.getVarName());
 			rddhandle.setHDFSFile(true);
 			fo.setRDDHandle(rddhandle);
 		}
-		
+
 		return rdd;
 	}
-	
+
 	/**
 	 * TODO So far we only create broadcast variables but never destroy
 	 * them. This is a memory leak which might lead to executor out-of-memory.
-	 * However, in order to handle this, we need to keep track when broadcast 
+	 * However, in order to handle this, we need to keep track when broadcast
 	 * variables are no longer required.
-	 * 
+	 *
 	 * @param varname variable name
 	 * @return wrapper for broadcast variables
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) 
+	public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname )
 		throws DMLRuntimeException
-	{		
+	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
 		MatrixObject mo = getMatrixObject(varname);
-		
+
 		PartitionedBroadcast<MatrixBlock> bret = null;
-		
+
 		//reuse existing broadcast handle
-		if( mo.getBroadcastHandle()!=null 
-			&& mo.getBroadcastHandle().isValid() ) 
+		if( mo.getBroadcastHandle()!=null
+			&& mo.getBroadcastHandle().isValid() )
 		{
 			bret = mo.getBroadcastHandle().getBroadcast();
 		}
-		
+
 		//create new broadcast handle (never created, evicted)
-		if( bret == null ) 
+		if( bret == null )
 		{
 			//account for overwritten invalid broadcast (e.g., evicted)
 			if( mo.getBroadcastHandle()!=null )
 				CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getSize());
-			
-			//obtain meta data for matrix 
+
+			//obtain meta data for matrix
 			int brlen = (int) mo.getNumRowsPerBlock();
 			int bclen = (int) mo.getNumColumnsPerBlock();
-			
+
 			//create partitioned matrix block and release memory consumed by input
 			MatrixBlock mb = mo.acquireRead();
 			PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<MatrixBlock>(mb, brlen, bclen);
 			mo.release();
-			
+
 			//determine coarse-grained partitioning
 			int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
-			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
 			Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts];
-					
+
 			//create coarse-grained partitioned broadcasts
 			if( numParts > 1 ) {
 				for( int i=0; i<numParts; i++ ) {
@@ -557,60 +551,60 @@ public class SparkExecutionContext extends ExecutionContext
 			else { //single partition
 				ret[0] = getSparkContext().broadcast(pmb);
 			}
-		
+
 			bret = new PartitionedBroadcast<MatrixBlock>(ret);
-			BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname, 
+			BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname,
 					OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
 			mo.setBroadcastHandle(bchandle);
 			CacheableData.addBroadcastSize(bchandle.getSize());
 		}
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
 			Statistics.incSparkBroadcastCount(1);
 		}
-		
+
 		return bret;
 	}
-	
+
 	@SuppressWarnings("unchecked")
-	public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname) 
+	public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname)
 		throws DMLRuntimeException
-	{		
+	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
 		FrameObject fo = getFrameObject(varname);
-		
+
 		PartitionedBroadcast<FrameBlock> bret = null;
-		
+
 		//reuse existing broadcast handle
-		if( fo.getBroadcastHandle()!=null 
-			&& fo.getBroadcastHandle().isValid() ) 
+		if( fo.getBroadcastHandle()!=null
+			&& fo.getBroadcastHandle().isValid() )
 		{
 			bret = fo.getBroadcastHandle().getBroadcast();
 		}
-		
+
 		//create new broadcast handle (never created, evicted)
-		if( bret == null ) 
+		if( bret == null )
 		{
 			//account for overwritten invalid broadcast (e.g., evicted)
 			if( fo.getBroadcastHandle()!=null )
 				CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getSize());
-			
-			//obtain meta data for frame 
+
+			//obtain meta data for frame
 			int bclen = (int) fo.getNumColumns();
 			int brlen = OptimizerUtils.getDefaultFrameSize();
-			
+
 			//create partitioned frame block and release memory consumed by input
 			FrameBlock mb = fo.acquireRead();
 			PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<FrameBlock>(mb, brlen, bclen);
 			fo.release();
-			
+
 			//determine coarse-grained partitioning
 			int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen);
-			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
 			Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts];
-					
+
 			//create coarse-grained partitioned broadcasts
 			if( numParts > 1 ) {
 				for( int i=0; i<numParts; i++ ) {
@@ -623,41 +617,41 @@ public class SparkExecutionContext extends ExecutionContext
 			else { //single partition
 				ret[0] = getSparkContext().broadcast(pmb);
 			}
-		
+
 			bret = new PartitionedBroadcast<FrameBlock>(ret);
-			BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname,  
+			BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname,
 					OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
 			fo.setBroadcastHandle(bchandle);
 			CacheableData.addBroadcastSize(bchandle.getSize());
 		}
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
 			Statistics.incSparkBroadcastCount(1);
 		}
-		
+
 		return bret;
 	}
 
 	/**
-	 * Keep the output rdd of spark rdd operations as meta data of matrix/frame 
+	 * Keep the output rdd of spark rdd operations as meta data of matrix/frame
 	 * objects in the symbol table.
-	 * 
+	 *
 	 * @param varname variable name
 	 * @param rdd JavaPairRDD handle for variable
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) 
+	public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd)
 		throws DMLRuntimeException
 	{
 		CacheableData<?> obj = getCacheableData(varname);
 		RDDObject rddhandle = new RDDObject(rdd, varname);
 		obj.setRDDHandle( rddhandle );
 	}
-	
+
 	/**
 	 * Utility method for creating an RDD out of an in-memory matrix block.
-	 * 
+	 *
 	 * @param sc java spark context
 	 * @param src matrix block
 	 * @param brlen block row length
@@ -665,13 +659,13 @@ public class SparkExecutionContext extends ExecutionContext
 	 * @return JavaPairRDD handle to matrix block
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
+	public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
 		throws DMLRuntimeException
-	{	
+	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
-		
-		if(    src.getNumRows() <= brlen 
+
+		if(    src.getNumRows() <= brlen
 		    && src.getNumColumns() <= bclen )
 		{
 			list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1,1), src));
@@ -679,44 +673,44 @@ public class SparkExecutionContext extends ExecutionContext
 		else
 		{
 			boolean sparse = src.isInSparseFormat();
-			
+
 			//create and write subblocks of matrix
 			for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
 				for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
 				{
 					int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
 					int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-					
+
 					MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse);
-						
+
 					int row_offset = blockRow*brlen;
 					int col_offset = blockCol*bclen;
-	
+
 					//copy submatrix to block
-					src.sliceOperations( row_offset, row_offset+maxRow-1, 
-							             col_offset, col_offset+maxCol-1, block );							
-					
+					src.sliceOperations( row_offset, row_offset+maxRow-1,
+							             col_offset, col_offset+maxCol-1, block );
+
 					//append block to sequence file
 					MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1);
 					list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(indexes, block));
 				}
 		}
-		
+
 		JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list);
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
 			Statistics.incSparkParallelizeCount(1);
 		}
-		
+
 		return result;
 	}
 
-	public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) 
+	public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
 		throws DMLRuntimeException
-	{	
+	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<Tuple2<Long,FrameBlock>>();
-			
+
 		//create and write subblocks of matrix
 		int blksize = ConfigurationManager.getBlocksize();
 		for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++)
@@ -725,28 +719,28 @@ public class SparkExecutionContext extends ExecutionContext
 			int roffset = blockRow*blksize;
 
 			FrameBlock block = new FrameBlock(src.getSchema());
-			
+
 			//copy sub frame to block, incl meta data on first
-			src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block );		
+			src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block );
 			if( roffset == 0 )
 				block.setColumnMetadata(src.getColumnMetadata());
-			
+
 			//append block to sequence file
 			list.addLast(new Tuple2<Long,FrameBlock>((long)roffset+1, block));
 		}
-		
+
 		JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list);
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
 			Statistics.incSparkParallelizeCount(1);
 		}
-		
+
 		return result;
 	}
-	
+
 	/**
 	 * This method is a generic abstraction for calls from the buffer pool.
-	 * 
+	 *
 	 * @param rdd rdd object
 	 * @param rlen number of rows
 	 * @param clen number of columns
@@ -757,21 +751,21 @@ public class SparkExecutionContext extends ExecutionContext
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	@SuppressWarnings("unchecked")
-	public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
+	public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz)
 		throws DMLRuntimeException
-	{			
+	{
 		return toMatrixBlock(
-				(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), 
+				(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(),
 				rlen, clen, brlen, bclen, nnz);
 	}
-	
+
 	/**
-	 * Utility method for creating a single matrix block out of a binary block RDD. 
-	 * Note that this collect call might trigger execution of any pending transformations. 
-	 * 
+	 * Utility method for creating a single matrix block out of a binary block RDD.
+	 * Note that this collect call might trigger execution of any pending transformations.
+	 *
 	 * NOTE: This is an unguarded utility function, which requires memory for both the output matrix
 	 * and its collected, blocked representation.
-	 * 
+	 *
 	 * @param rdd JavaPairRDD for matrix block
 	 * @param rlen number of rows
 	 * @param clen number of columns
@@ -781,19 +775,19 @@ public class SparkExecutionContext extends ExecutionContext
 	 * @return matrix block
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
+	public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
 		throws DMLRuntimeException
 	{
-		
+
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
 		MatrixBlock out = null;
-		
+
 		if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
 		{
 			//special case without copy and nnz maintenance
 			List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-			
+
 			if( list.size()>1 )
 				throw new DMLRuntimeException("Expecting no more than one result block.");
 			else if( list.size()==1 )
@@ -806,70 +800,70 @@ public class SparkExecutionContext extends ExecutionContext
 			//determine target sparse/dense representation
 			long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
 			boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-			
+
 			//create output matrix block (w/ lazy allocation)
 			out = new MatrixBlock(rlen, clen, sparse, lnnz);
-			
+
 			List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-			
+
 			//copy blocks one-at-a-time into output matrix block
-			long aNnz = 0; 
+			long aNnz = 0;
 			for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
 			{
 				//unpack index-block pair
 				MatrixIndexes ix = keyval._1();
 				MatrixBlock block = keyval._2();
-				
+
 				//compute row/column block offsets
 				int row_offset = (int)(ix.getRowIndex()-1)*brlen;
 				int col_offset = (int)(ix.getColumnIndex()-1)*bclen;
 				int rows = block.getNumRows();
 				int cols = block.getNumColumns();
-				
+
 				//append block
 				if( sparse ) { //SPARSE OUTPUT
 					//append block to sparse target in order to avoid shifting, where
 					//we use a shallow row copy in case of MCSR and single column blocks
-					//note: this append requires, for multiple column blocks, a final sort 
+					//note: this append requires, for multiple column blocks, a final sort
 					out.appendToSparse(block, row_offset, col_offset, clen>bclen);
 				}
 				else { //DENSE OUTPUT
-					out.copy( row_offset, row_offset+rows-1, 
-							  col_offset, col_offset+cols-1, block, false );	
+					out.copy( row_offset, row_offset+rows-1,
+							  col_offset, col_offset+cols-1, block, false );
 				}
-				
+
 				//incremental maintenance nnz
 				aNnz += block.getNonZeros();
 			}
-			
+
 			//post-processing output matrix
 			if( sparse && clen>bclen )
 				out.sortSparseRows();
 			out.setNonZeros(aNnz);
 			out.examSparsity();
 		}
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkCollectTime(System.nanoTime() - t0);
 			Statistics.incSparkCollectCount(1);
 		}
-		
+
 		return out;
 	}
-	
+
 	@SuppressWarnings("unchecked")
-	public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) 
+	public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
 		throws DMLRuntimeException
-	{			
+	{
 		return toMatrixBlock(
-				(JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(), 
+				(JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(),
 				rlen, clen, nnz);
 	}
-	
+
 	/**
-	 * Utility method for creating a single matrix block out of a binary cell RDD. 
-	 * Note that this collect call might trigger execution of any pending transformations. 
-	 * 
+	 * Utility method for creating a single matrix block out of a binary cell RDD.
+	 * Note that this collect call might trigger execution of any pending transformations.
+	 *
 	 * @param rdd JavaPairRDD for matrix block
 	 * @param rlen number of rows
 	 * @param clen number of columns
@@ -877,57 +871,57 @@ public class SparkExecutionContext extends ExecutionContext
 	 * @return matrix block
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) 
+	public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz)
 		throws DMLRuntimeException
-	{	
+	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
 		MatrixBlock out = null;
-		
+
 		//determine target sparse/dense representation
 		long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
 		boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-				
+
 		//create output matrix block (w/ lazy allocation)
 		out = new MatrixBlock(rlen, clen, sparse);
-		
+
 		List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
-		
+
 		//copy blocks one-at-a-time into output matrix block
 		for( Tuple2<MatrixIndexes,MatrixCell> keyval : list )
 		{
 			//unpack index-block pair
 			MatrixIndexes ix = keyval._1();
 			MatrixCell cell = keyval._2();
-			
+
 			//append cell to dense/sparse target in order to avoid shifting for sparse
 			//note: this append requires a final sort of sparse rows
 			out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue());
 		}
-		
+
 		//post-processing output matrix
 		if( sparse )
 			out.sortSparseRows();
 		out.recomputeNonZeros();
 		out.examSparsity();
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkCollectTime(System.nanoTime() - t0);
 			Statistics.incSparkCollectCount(1);
 		}
-		
+
 		return out;
 	}
 
-	public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
+	public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
 		throws DMLRuntimeException
 	{
-		
+
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
 		PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen);
 		List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-		
+
 		//copy blocks one-at-a-time into output matrix block
 		for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
 		{
@@ -936,24 +930,24 @@ public class SparkExecutionContext extends ExecutionContext
 			MatrixBlock block = keyval._2();
 			out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block);
 		}
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkCollectTime(System.nanoTime() - t0);
 			Statistics.incSparkCollectCount(1);
 		}
-				
+
 		return out;
 	}
 
 	@SuppressWarnings("unchecked")
-	public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) 
-		throws DMLRuntimeException 
+	public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen)
+		throws DMLRuntimeException
 	{
 		JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD();
 		return toFrameBlock(lrdd, schema, rlen, clen);
 	}
 
-	public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) 
+	public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen)
 		throws DMLRuntimeException
 	{
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -964,16 +958,16 @@ public class SparkExecutionContext extends ExecutionContext
 		//create output frame block (w/ lazy allocation)
 		FrameBlock out = new FrameBlock(schema);
 		out.ensureAllocatedColumns(rlen);
-		
+
 		List<Tuple2<Long,FrameBlock>> list = rdd.collect();
-		
+
 		//copy blocks one-at-a-time into output matrix block
 		for( Tuple2<Long,FrameBlock> keyval : list )
 		{
 			//unpack index-block pair
 			int ix = (int)(keyval._1() - 1);
 			FrameBlock block = keyval._2();
-		
+
 			//copy into output frame
 			out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block );
 			if( ix == 0 ) {
@@ -981,12 +975,12 @@ public class SparkExecutionContext extends ExecutionContext
 				out.setColumnMetadata(block.getColumnMetadata());
 			}
 		}
-		
+
 		if (DMLScript.STATISTICS) {
 			Statistics.accSparkCollectTime(System.nanoTime() - t0);
 			Statistics.incSparkCollectCount(1);
 		}
-		
+
 		return out;
 	}
 
@@ -994,17 +988,17 @@ public class SparkExecutionContext extends ExecutionContext
 	public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
 	{
 		JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
-		
+
 		//piggyback nnz maintenance on write
 		LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
 		lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
-		
+
 		//save file is an action which also triggers nnz maintenance
-		lrdd.saveAsHadoopFile(path, 
-				oinfo.outputKeyClass, 
-				oinfo.outputValueClass, 
+		lrdd.saveAsHadoopFile(path,
+				oinfo.outputKeyClass,
+				oinfo.outputValueClass,
 				oinfo.outputFormatClass);
-		
+
 		//return nnz aggregate of all blocks
 		return aNnz.value();
 	}
@@ -1013,58 +1007,58 @@ public class SparkExecutionContext extends ExecutionContext
 	public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
 	{
 		JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD();
-		
+
 		//convert keys to writables if necessary
 		if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
 			lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair(
 					new LongFrameToLongWritableFrameFunction());
 			oinfo = OutputInfo.BinaryBlockFrameOutputInfo;
 		}
-	
+
 		//save file is an action which also triggers nnz maintenance
-		lrdd.saveAsHadoopFile(path, 
-				oinfo.outputKeyClass, 
-				oinfo.outputValueClass, 
+		lrdd.saveAsHadoopFile(path,
+				oinfo.outputKeyClass,
+				oinfo.outputValueClass,
 				oinfo.outputFormatClass);
 	}
-	
+
 	///////////////////////////////////////////
 	// Cleanup of RDDs and Broadcast variables
 	///////
-	
+
 	/**
 	 * Adds a child rdd object to the lineage of a parent rdd.
-	 * 
+	 *
 	 * @param varParent parent variable
 	 * @param varChild child variable
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public void addLineageRDD(String varParent, String varChild) 
-		throws DMLRuntimeException 
+	public void addLineageRDD(String varParent, String varChild)
+		throws DMLRuntimeException
 	{
 		RDDObject parent = getCacheableData(varParent).getRDDHandle();
 		RDDObject child = getCacheableData(varChild).getRDDHandle();
-		
+
 		parent.addLineageChild( child );
 	}
-	
+
 	/**
 	 * Adds a child broadcast object to the lineage of a parent rdd.
-	 * 
+	 *
 	 * @param varParent parent variable
 	 * @param varChild child variable
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public void addLineageBroadcast(String varParent, String varChild) 
-		throws DMLRuntimeException 
+	public void addLineageBroadcast(String varParent, String varChild)
+		throws DMLRuntimeException
 	{
 		RDDObject parent = getCacheableData(varParent).getRDDHandle();
 		BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle();
-		
+
 		parent.addLineageChild( child );
 	}
 
-	public void addLineage(String varParent, String varChild, boolean broadcast) 
+	public void addLineage(String varParent, String varChild, boolean broadcast)
 		throws DMLRuntimeException
 	{
 		if( broadcast )
@@ -1072,25 +1066,25 @@ public class SparkExecutionContext extends ExecutionContext
 		else
 			addLineageRDD(varParent, varChild);
 	}
-	
+
 	@Override
-	public void cleanupMatrixObject( MatrixObject mo ) 
+	public void cleanupMatrixObject( MatrixObject mo )
 		throws DMLRuntimeException
 	{
 		//NOTE: this method overwrites the default behavior of cleanupMatrixObject
 		//and hence is transparently used by rmvar instructions and other users. The
 		//core difference is the lineage-based cleanup of RDD and broadcast variables.
-		
+
 		try
 		{
-			if ( mo.isCleanupEnabled() ) 
+			if ( mo.isCleanupEnabled() )
 			{
 				//compute ref count only if matrix cleanup actually necessary
-				if ( !getVariables().hasReferences(mo) ) 
+				if ( !getVariables().hasReferences(mo) )
 				{
-					//clean cached data	
-					mo.clearData(); 
-					
+					//clean cached data
+					mo.clearData();
+
 					//clean hdfs data if no pending rdd operations on it
 					if( mo.isHDFSFileExists() && mo.getFileName()!=null ) {
 						if( mo.getRDDHandle()==null ) {
@@ -1101,12 +1095,12 @@ public class SparkExecutionContext extends ExecutionContext
 							rdd.setHDFSFilename(mo.getFileName());
 						}
 					}
-					
+
 					//cleanup RDD and broadcast variables (recursive)
 					//note: requires that mo.clearData already removed back references
-					if( mo.getRDDHandle()!=null ) { 
+					if( mo.getRDDHandle()!=null ) {
  						rCleanupLineageObject(mo.getRDDHandle());
-					}	
+					}
 					if( mo.getBroadcastHandle()!=null ) {
 						rCleanupLineageObject(mo.getBroadcastHandle());
 					}
@@ -1120,18 +1114,18 @@ public class SparkExecutionContext extends ExecutionContext
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	private void rCleanupLineageObject(LineageObject lob) 
+	private void rCleanupLineageObject(LineageObject lob)
 		throws IOException
-	{		
+	{
 		//abort recursive cleanup if still consumers
 		if( lob.getNumReferences() > 0 )
 			return;
-			
-		//abort if still reachable through matrix object (via back references for 
+
+		//abort if still reachable through matrix object (via back references for
 		//robustness in function calls and to prevent repeated scans of the symbol table)
 		if( lob.hasBackReference() )
 			return;
-		
+
 		//cleanup current lineage object (from driver/executors)
 		//incl deferred hdfs file removal (only if metadata set by cleanup call)
 		if( lob instanceof RDDObject ) {
@@ -1151,38 +1145,38 @@ public class SparkExecutionContext extends ExecutionContext
 					cleanupBroadcastVariable(bc);
 			CacheableData.addBroadcastSize(-((BroadcastObject)lob).getSize());
 		}
-	
+
 		//recursively process lineage children
 		for( LineageObject c : lob.getLineageChilds() ){
 			c.decrementNumReferences();
 			rCleanupLineageObject(c);
 		}
 	}
-	
+
 	/**
 	 * This call destroys a broadcast variable at all executors and the driver.
 	 * Hence, it is intended to be used on rmvar only. Depending on the
 	 * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-	 * 
+	 *
 	 * @param bvar broadcast variable
 	 */
-	public static void cleanupBroadcastVariable(Broadcast<?> bvar) 
+	public static void cleanupBroadcastVariable(Broadcast<?> bvar)
 	{
-		//In comparison to 'unpersist' (which would only delete the broadcast 
+		//In comparison to 'unpersist' (which would only delete the broadcast
 		//from the executors), this call also deletes related data from the driver.
 		if( bvar.isValid() ) {
 			bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY );
 		}
 	}
-	
+
 	/**
 	 * This call removes an rdd variable from executor memory and disk if required.
 	 * Hence, it is intended to be used on rmvar only. Depending on the
 	 * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-	 * 
+	 *
 	 * @param rvar rdd variable to remove
 	 */
-	public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) 
+	public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
 	{
 		if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
 			rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY );
@@ -1190,72 +1184,72 @@ public class SparkExecutionContext extends ExecutionContext
 	}
 
 	@SuppressWarnings("unchecked")
-	public void repartitionAndCacheMatrixObject( String var ) 
+	public void repartitionAndCacheMatrixObject( String var )
 		throws DMLRuntimeException
 	{
 		MatrixObject mo = getMatrixObject(var);
 		MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
-		
+
 		//double check size to avoid unnecessary spark context creation
 		if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
 				OptimizerUtils.estimateSizeExactSparsity(mcIn)) )
-			return;	
-		
+			return;
+
 		//get input rdd and default storage level
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
 				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-		
+
 		//avoid unnecessary caching of input in order to reduce memory pressure
 		if( mo.getRDDHandle().allowsShortCircuitRead()
 			&& isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
 			in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
 					((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
-			
+
 			//investigate issue of unnecessarily large number of partitions
 			int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
 			if( numPartitions < in.getNumPartitions() )
 				in = in.coalesce( numPartitions );
 		}
-		
-		//repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit 
+
+		//repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit
 		//executed on the original data, because there will be no merge, i.e., no key duplicates
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false);
-		
+
 		//convert mcsr into memory-efficient csr if potentially sparse
-		if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {				
+		if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
 			out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
 		}
-		
-		//persist rdd in default storage level 
+
+		//persist rdd in default storage level
 		out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
 		   .count(); //trigger caching to prevent contention
-		
+
 		//create new rdd handle, in-place of current matrix object
 		RDDObject inro =  mo.getRDDHandle();       //guaranteed to exist (see above)
 		RDDObject outro = new RDDObject(out, var); //create new rdd object
 		outro.setCheckpointRDD(true);              //mark as checkpointed
 		outro.addLineageChild(inro);               //keep lineage to prevent cycles on cleanup
-		mo.setRDDHandle(outro);				       
+		mo.setRDDHandle(outro);
 	}
 
 	@SuppressWarnings("unchecked")
-	public void cacheMatrixObject( String var ) 
+	public void cacheMatrixObject( String var )
 		throws DMLRuntimeException
 	{
 		//get input rdd and default storage level
 		MatrixObject mo = getMatrixObject(var);
-		
+
 		//double check size to avoid unnecessary spark context creation
 		if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
 				OptimizerUtils.estimateSizeExactSparsity(mo.getMatrixCharacteristics())) )
-			return;	
-		
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+			return;
+
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
 				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-		
+
 		//persist rdd (force rdd caching, if not already cached)
 		if( !isRDDCached(in.id()) )
-			in.count(); //trigger caching to prevent contention			       
+			in.count(); //trigger caching to prevent contention
 	}
 
 	public void setThreadLocalSchedulerPool(String poolName) {
@@ -1283,7 +1277,7 @@ public class SparkExecutionContext extends ExecutionContext
 		if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
 			return false;
 		}
-		
+
 		//check that rdd is actually already cached
 		for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
 			if( info.id() == rddID )
@@ -1293,35 +1287,35 @@ public class SparkExecutionContext extends ExecutionContext
 	}
 
 	///////////////////////////////////////////
-	// Spark configuration handling 
+	// Spark configuration handling
 	///////
 
 	/**
-	 * Obtains the lazily analyzed spark cluster configuration. 
-	 * 
+	 * Obtains the lazily analyzed spark cluster configuration.
+	 *
 	 * @return spark cluster configuration
 	 */
 	public static SparkClusterConfig getSparkClusterConfig() {
-		//lazy creation of spark cluster config		
+		//lazy creation of spark cluster config
 		if( _sconf == null )
 			_sconf = new SparkClusterConfig();
 		return _sconf;
 	}
-	
+
 	/**
 	 * Obtains the available memory budget for broadcast variables in bytes.
-	 * 
+	 *
 	 * @return broadcast memory budget
 	 */
 	public static double getBroadcastMemoryBudget() {
 		return getSparkClusterConfig()
 			.getBroadcastMemoryBudget();
 	}
-	
+
 	/**
 	 * Obtain the available memory budget for data storage in bytes.
-	 * 
-	 * @param min      flag for minimum data budget 
+	 *
+	 * @param min      flag for minimum data budget
 	 * @param refresh  flag for refresh with spark context
 	 * @return data memory budget
 	 */
@@ -1329,21 +1323,21 @@ public class SparkExecutionContext extends ExecutionContext
 		return getSparkClusterConfig()
 			.getDataMemoryBudget(min, refresh);
 	}
-	
+
 	/**
 	 * Obtain the number of executors in the cluster (excluding the driver).
-	 * 
+	 *
 	 * @return number of executors
 	 */
 	public static int getNumExecutors() {
 		return getSparkClusterConfig()
 			.getNumExecutors();
 	}
-	
+
 	/**
-	 * Obtain the default degree of parallelism (cores in the cluster). 
-	 * 
-	 * @param refresh  flag for refresh with spark context 
+	 * Obtain the default degree of parallelism (cores in the cluster).
+	 *
+	 * @param refresh  flag for refresh with spark context
 	 * @return default degree of parallelism
 	 */
 	public static int getDefaultParallelism(boolean refresh) {
@@ -1360,13 +1354,13 @@ public class SparkExecutionContext extends ExecutionContext
 		int numExecutors = getNumExecutors();
 		int numCores = getDefaultParallelism(false);
 		boolean multiThreaded = (numCores > numExecutors);
-		
+
 		//check for jdk version less than 8 (and raise warning if multi-threaded)
-		if( isLtJDK8 && multiThreaded) 
+		if( isLtJDK8 && multiThreaded)
 		{
-			//get the jre version 
+			//get the jre version
 			String version = System.getProperty("java.version");
-			
+
 			LOG.warn("########################################################################################");
 			LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####");
 			LOG.warn("### java.version = " + version);
@@ -1377,51 +1371,51 @@ public class SparkExecutionContext extends ExecutionContext
 			LOG.warn("########################################################################################");
 		}
 	}
-	
+
 	/**
-	 * Captures relevant spark cluster configuration properties, e.g., memory budgets and 
+	 * Captures relevant spark cluster configuration properties, e.g., memory budgets and
 	 * degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current
-	 * configurations and provides a unified view. 
+	 * configurations and provides a unified view.
 	 */
-	private static class SparkClusterConfig 
+	private static class SparkClusterConfig
 	{
 		//broadcasts are stored in mem-and-disk in data space, this config
 		//defines the fraction of data space to be used as broadcast budget
 		private static final double BROADCAST_DATA_FRACTION = 0.3;
-		
+
 		//forward private config from Spark's UnifiedMemoryManager.scala (>1.6)
 		private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024;
-		
+
 		//meta configurations
 		private boolean _legacyVersion = false; //spark version <1.6
 		private boolean _confOnly = false; //infrastructure info based on config
-		
+
 		//memory management configurations
 		private long _memExecutor = -1; //mem per executor
 		private double _memDataMinFrac = -1; //minimum data fraction
 		private double _memDataMaxFrac = -1; //maximum data fraction
 		private double _memBroadcastFrac = -1; //broadcast fraction
-		
+
 		//degree of parallelism configurations
 		private int _numExecutors = -1; //total executors
-		private int _defaultPar = -1; //total vcores  
-	
-		public SparkClusterConfig() 
+		private int _defaultPar = -1; //total vcores
+
+		public SparkClusterConfig()
 		{
 			SparkConf sconf = createSystemMLSparkConf();
 			_confOnly = true;
-			
+
 			//parse version and config
 			String sparkVersion = getSparkVersionString();
 			_legacyVersion = (UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0
 					|| sconf.getBoolean("spark.memory.useLegacyMode", false) );
-	
+
 			//obtain basic spark configurations
 			if( _legacyVersion )
 				analyzeSparkConfiguationLegacy(sconf);
 			else
 				analyzeSparkConfiguation(sconf);
-	
+
 			//log debug of created spark cluster config
 			if( LOG.isDebugEnabled() )
 				LOG.debug( this.toString() );
@@ -1432,30 +1426,30 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 
 		public long getDataMemoryBudget(boolean min, boolean refresh) {
-			//always get the current num executors on refresh because this might 
+			//always get the current num executors on refresh because this might
 			//change if not all executors are initially allocated and it is plan-relevant
 			int numExec = _numExecutors;
 			if( refresh && !_confOnly ) {
 				JavaSparkContext jsc = getSparkContextStatic();
 				numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
 			}
-			
+
 			//compute data memory budget
 			return (long) ( numExec * _memExecutor *
-				(min ? _memDataMinFrac : _memDataMaxFrac) );	
+				(min ? _memDataMinFrac : _memDataMaxFrac) );
 		}
 
 		public int getNumExecutors() {
 			if( _numExecutors < 0 )
-				analyzeSparkParallelismConfiguation(null);			
+				analyzeSparkParallelismConfiguation(null);
 			return _numExecutors;
 		}
 
 		public int getDefaultParallelism(boolean refresh) {
 			if( _defaultPar < 0 && !refresh )
 				analyzeSparkParallelismConfiguation(null);
-			
-			//always get the current default parallelism on refresh because this might 
+
+			//always get the current default parallelism on refresh because this might
 			//change if not all executors are initially allocated and it is plan-relevant
 			return ( refresh && !_confOnly ) ?
 				getSparkContextStatic().defaultParallelism() : _defaultPar;
@@ -1464,36 +1458,36 @@ public class SparkExecutionContext extends ExecutionContext
 		public void analyzeSparkConfiguationLegacy(SparkConf conf)  {
 			//ensure allocated spark conf
 			SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
-			
+
 			//parse absolute executor memory
 			_memExecutor = UtilFunctions.parseMemorySize(
 					sconf.get("spark.executor.memory", "1g"));
-			
+
 			//get data and shuffle memory ratios (defaults not specified in job conf)
 			double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
 			_memDataMinFrac = dataFrac;
 			_memDataMaxFrac = dataFrac;
 			_memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18%
-			
-			//analyze spark degree of parallelism 
-			analyzeSparkParallelismConfiguation(sconf);	
+
+			//analyze spark degree of parallelism
+			analyzeSparkParallelismConfiguation(sconf);
 		}
 
 		public void analyzeSparkConfiguation(SparkConf conf) {
 			//ensure allocated spark conf
 			SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
-			
+
 			//parse absolute executor memory, incl fixed cut off
 			_memExecutor = UtilFunctions.parseMemorySize(
-					sconf.get("spark.executor.memory", "1g")) 
+					sconf.get("spark.executor.memory", "1g"))
 					- RESERVED_SYSTEM_MEMORY_BYTES;
-			
+
 			//get data and shuffle memory ratios (defaults not specified in job conf)
 			_memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
 			_memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.75); //default 75%
 			_memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 22.5%
-			
-			//analyze spark degree of parallelism 
+
+			//analyze spark degree of parallelism
 			analyzeSparkParallelismConfiguation(sconf);
 		}
 
@@ -1501,7 +1495,7 @@ public class SparkExecutionContext extends ExecutionContext
 			int numExecutors = sconf.getInt("spark.executor.instances", -1);
 			int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
 			int defaultPar = sconf.getInt("spark.default.parallelism", -1);
-			
+
 			if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) {
 				_numExecutors = numExecutors;
 				_defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec;
@@ -1512,28 +1506,28 @@ public class SparkExecutionContext extends ExecutionContext
 				//note: spark context provides this information while conf does not
 				//(for num executors we need to correct for driver and local mode)
 				JavaSparkContext jsc = getSparkContextStatic();
-				_numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);  
+				_numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
 				_defaultPar = jsc.defaultParallelism();
-				_confOnly &= false; //implies env info refresh w/ spark context 
+				_confOnly &= false; //implies env info refresh w/ spark context
 			}
 		}
-		
+
 		/**
 		 * Obtains the spark version string. If the spark context has been created,
-		 * we simply get it from the context; otherwise, we use Spark internal 
-		 * constants to avoid creating the spark context just for the version. 
-		 * 
+		 * we simply get it from the context; otherwise, we use Spark internal
+		 * constants to avoid creating the spark context just for the version.
+		 *
 		 * @return spark version string
 		 */
 		private String getSparkVersionString() {
 			//check for existing spark context
-			if( isSparkContextCreated() ) 
+			if( isSparkContextCreated() )
 				return getSparkContextStatic().version();
-			
+
 			//use spark internal constant to avoid context creation
 			return org.apache.spark.package$.MODULE$.SPARK_VERSION();
 		}
-		
+
 		@Override
 		public String toString() {
 			StringBuilder sb = new StringBuilder("SparkClusterConfig: \n");
@@ -1544,17 +1538,17 @@ public class SparkExecutionContext extends ExecutionContext
 			sb.append("-- memDataMaxFrac   = " + _memDataMaxFrac + "\n");
 			sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n");
 			sb.append("-- numExecutors     = " + _numExecutors + "\n");
-			sb.append("-- defaultPar       = " + _defaultPar + "\n");		
+			sb.append("-- defaultPar       = " + _defaultPar + "\n");
 			return sb.toString();
 		}
 	}
-	
-	private static class MemoryManagerParRDDs 
+
+	private static class MemoryManagerParRDDs
 	{
 		private final long _limit;
 		private long _size;
 		private HashMap<Integer, Long> _rdds;
-		
+
 		public MemoryManagerParRDDs(double fractionMem) {
 			_limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory());
 			_size = 0;
@@ -1566,7 +1560,7 @@ public class SparkExecutionContext extends ExecutionContext
 			_size += ret ? rddSize : 0;
 			return ret;
 		}
-		
+
 		public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) {
 			if( !reserved ) {
 				throw new RuntimeException("Unsupported rdd registration "
@@ -1574,7 +1568,7 @@ public class SparkExecutionContext extends ExecutionContext
 			}
 			_rdds.put(rddID, rddSize);
 		}
-		
+
 		public synchronized void deregisterRDD(int rddID) {
 			long rddSize = _rdds.remove(rddID);
 			_size -= rddSize;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
deleted file mode 100644
index a1173bd..0000000
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.functions;
-
-import java.io.Serializable;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.MLBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-
-@SuppressWarnings("deprecation")
-public class GetMLBlock implements Function<Tuple2<MatrixIndexes,MatrixBlock>, Row>, Serializable {
-
-	private static final long serialVersionUID = 8829736765002126985L;
-
-	@Override
-	public Row call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
-		return new MLBlock(kv._1, kv._2);
-	}
-	
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f206fbd..377ca2e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,14 +63,14 @@ import scala.Tuple2;
  * can be moved to RDDConverterUtils.
  */
 @SuppressWarnings("unused")
-public class RDDConverterUtilsExt 
+public class RDDConverterUtilsExt
 {
 	public enum RDDConverterTypes {
 		TEXT_TO_MATRIX_CELL,
 		MATRIXENTRY_TO_MATRIXCELL
 	}
-	
-	
+
+
 	/**
 	 * Example usage:
 	 * <pre><code>
@@ -88,7 +88,7 @@ public class RDDConverterUtilsExt
 	 * val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz)
 	 * val binBlocks = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), coordinateMatrix, mc, true)
 	 * </code></pre>
-	 * 
+	 *
 	 * @param sc java spark context
 	 * @param input coordinate matrix
 	 * @param mcIn matrix characteristics
@@ -97,26 +97,26 @@ public class RDDConverterUtilsExt
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(JavaSparkContext sc,
-			CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException 
+			CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
 	{
 		//convert matrix entry rdd to binary block rdd (w/ partial blocks)
 		JavaPairRDD<MatrixIndexes, MatrixBlock> out = input.entries().toJavaRDD()
 				.mapPartitionsToPair(new MatrixEntryToBinaryBlockFunction(mcIn));
-		
-		//inject empty blocks (if necessary) 
+
+		//inject empty blocks (if necessary)
 		if( outputEmptyBlocks && mcIn.mightHaveEmptyBlocks() ) {
-			out = out.union( 
+			out = out.union(
 				SparkUtils.getEmptyBlockRDD(sc, mcIn) );
 		}
-		
+
 		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey(out, false); 
-		
+		out = RDDAggregateUtils.mergeByKey(out, false);
+
 		return out;
 	}
-	
+
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(SparkContext sc,
-			CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException 
+			CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
 	{
 		return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), input, mcIn, true);
 	}
@@ -128,19 +128,19 @@ public class RDDConverterUtilsExt
 		}
 		return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
 	}
-	
+
 	public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen) throws DMLRuntimeException {
 		return convertPy4JArrayToMB(data, (int)rlen, (int)clen, false);
 	}
-	
+
 	public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen) throws DMLRuntimeException {
 		return convertPy4JArrayToMB(data, rlen, clen, false);
 	}
-	
+
 	public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, long rlen, long clen, long nnz) throws DMLRuntimeException {
 		return convertSciPyCOOToMB(data, row, col, (int)rlen, (int)clen, (int)nnz);
 	}
-	
+
 	public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, int rlen, int clen, int nnz) throws DMLRuntimeException {
 		MatrixBlock mb = new MatrixBlock(rlen, clen, true);
 		mb.allocateSparseRowsBlock(false);
@@ -154,17 +154,17 @@ public class RDDConverterUtilsExt
 			double val = buf1.getDouble();
 			int rowIndex = buf2.getInt();
 			int colIndex = buf3.getInt();
-			mb.setValue(rowIndex, colIndex, val); 
+			mb.setValue(rowIndex, colIndex, val);
 		}
 		mb.recomputeNonZeros();
 		mb.examSparsity();
 		return mb;
 	}
-	
+
 	public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen, boolean isSparse) throws DMLRuntimeException {
 		return convertPy4JArrayToMB(data, (int) rlen, (int) clen, isSparse);
 	}
-	
+
 	public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) {
 		MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
 		ret.allocateDenseOrSparseBlock();
@@ -176,7 +176,7 @@ public class RDDConverterUtilsExt
 		}
 		return allocateDenseOrSparse(rlen, clen, isSparse);
 	}
-	
+
 	public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws DMLRuntimeException {
 		copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, (long)rlen, (long)clen);
 	}
@@ -192,12 +192,12 @@ public class RDDConverterUtilsExt
 			ret.copy((int)(rowIndex*numRowsPerBlock), (int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, false);
 		// }
 	}
-	
+
 	public static void postProcessAfterCopying(MatrixBlock ret) throws DMLRuntimeException {
 		ret.recomputeNonZeros();
 		ret.examSparsity();
 	}
-	
+
 	public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen, boolean isSparse) throws DMLRuntimeException {
 		MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1);
 		if(isSparse) {
@@ -219,19 +219,19 @@ public class RDDConverterUtilsExt
 		mb.examSparsity();
 		return mb;
 	}
-	
+
 	public static byte [] convertMBtoPy4JDenseArr(MatrixBlock mb) throws DMLRuntimeException {
 		byte [] ret = null;
 		if(mb.isInSparseFormat()) {
 			mb.sparseToDense();
 		}
-		
+
 		long limit = mb.getNumRows()*mb.getNumColumns();
 		int times = Double.SIZE / Byte.SIZE;
 		if( limit > Integer.MAX_VALUE / times )
 			throw new DMLRuntimeException("MatrixBlock of size " + limit + " cannot be converted to dense numpy array");
 		ret = new byte[(int) (limit * times)];
-		
+
 		double [] denseBlock = mb.getDenseBlock();
 		if(mb.isEmptyBlock()) {
 			for(int i=0;i < limit;i++){
@@ -246,10 +246,10 @@ public class RDDConverterUtilsExt
 		        ByteBuffer.wrap(ret, i*times, times).order(ByteOrder.nativeOrder()).putDouble(denseBlock[i]);
 			}
 		}
-		
+
 		return ret;
 	}
-	
+
 	public static class AddRowID implements Function<Tuple2<Row,Long>, Row> {
 		private static final long serialVersionUID = -3733816995375745659L;
 
@@ -263,12 +263,12 @@ public class RDDConverterUtilsExt
 			fields[oldNumCols] = new Double(arg0._2 + 1);
 			return RowFactory.create(fields);
 		}
-		
+
 	}
 
 	/**
 	 * Add element indices as new column to DataFrame
-	 * 
+	 *
 	 * @param df input data frame
 	 * @param sparkSession the Spark Session
 	 * @param nameOfCol name of index column
@@ -286,27 +286,10 @@ public class RDDConverterUtilsExt
 		return sparkSession.createDataFrame(newRows, new StructType(newSchema));
 	}
 
-	/**
-	 * Add element indices as new column to DataFrame
-	 * 
-	 * @param df input data frame
-	 * @param sqlContext the SQL Context
-	 * @param nameOfCol name of index column
-	 * @return new data frame
-	 * 
-	 * @deprecated This will be removed in SystemML 1.0.
-	 */
-	@Deprecated
-	public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) {
-		SparkSession sparkSession = sqlContext.sparkSession();
-		return addIDToDataFrame(df, sparkSession, nameOfCol);
-	}
-	
-	
-	private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> 
+	private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>
 	{
 		private static final long serialVersionUID = 4907483236186747224L;
-		
+
 		private IJVToBinaryBlockFunctionHelper helper = null;
 		public MatrixEntryToBinaryBlockFunction(MatrixCharacteristics mc) throws DMLRuntimeException {
 			helper = new IJVToBinaryBlockFunctionHelper(mc);
@@ -318,18 +301,18 @@ public class RDDConverterUtilsExt
 		}
 
 	}
-	
+
 	private static class IJVToBinaryBlockFunctionHelper implements Serializable {
 		private static final long serialVersionUID = -7952801318564745821L;
 		//internal buffer size (aligned w/ default matrix block size)
 		private static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB)
 		private int _bufflen = -1;
-		
+
 		private long _rlen = -1;
 		private long _clen = -1;
 		private int _brlen = -1;
 		private int _bclen = -1;
-		
+
 		public IJVToBinaryBlockFunctionHelper(MatrixCharacteristics mc) throws DMLRuntimeException
 		{
 			if(!mc.dimsKnown()) {
@@ -339,21 +322,21 @@ public class RDDConverterUtilsExt
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
-			
+
 			//determine upper bounded buffer len
 			_bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE);
-			
+
 		}
-		
+
 		// ----------------------------------------------------
 		// Can extend this by having type hierarchy
 		public Tuple2<MatrixIndexes, MatrixCell> textToMatrixCell(Text txt) {
 			FastStringTokenizer st = new FastStringTokenizer(' ');
 			//get input string (ignore matrix market comments)
 			String strVal = txt.toString();
-			if( strVal.startsWith("%") ) 
+			if( strVal.startsWith("%") )
 				return null;
-			
+
 			//parse input ijv triple
 			st.reset( strVal );
 			long row = st.nextLong();
@@ -363,19 +346,19 @@ public class RDDConverterUtilsExt
 			MatrixCell cell = new MatrixCell(val);
 			return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell);
 		}
-		
+
 		public Tuple2<MatrixIndexes, MatrixCell> matrixEntryToMatrixCell(MatrixEntry entry) {
 			MatrixIndexes indx = new MatrixIndexes(entry.i(), entry.j());
 			MatrixCell cell = new MatrixCell(entry.value());
 			return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell);
 		}
-		
+
 		// ----------------------------------------------------
-		
+
 		Iterable<Tuple2<MatrixIndexes, MatrixBlock>> convertToBinaryBlock(Object arg0, RDDConverterTypes converter)  throws Exception {
 			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
 			ReblockBuffer rbuff = new ReblockBuffer(_bufflen, _rlen, _clen, _brlen, _bclen);
-		
+
 			Iterator<?> iter = (Iterator<?>) arg0;
 			while( iter.hasNext() ) {
 				Tuple2<MatrixIndexes, MatrixCell> cell = null;
@@ -383,38 +366,38 @@ public class RDDConverterUtilsExt
 					case MATRIXENTRY_TO_MATRIXCELL:
 						cell = matrixEntryToMatrixCell((MatrixEntry) iter.next());
 						break;
-						
+
 					case TEXT_TO_MATRIX_CELL:
 						cell = textToMatrixCell((Text) iter.next());
 						break;
-					
+
 					default:
 						throw new Exception("Invalid converter for IJV data:" + converter.toString());
 				}
-				
+
 				if(cell == null) {
 					continue;
 				}
-				
+
 				//flush buffer if necessary
 				if( rbuff.getSize() >= rbuff.getCapacity() )
 					flushBufferToList(rbuff, ret);
-				
+
 				//add value to reblock buffer
 				rbuff.appendCell(cell._1.getRowIndex(), cell._1.getColumnIndex(), cell._2.getValue());
 			}
-			
+
 			//final flush buffer
 			flushBufferToList(rbuff, ret);
-		
+
 			return ret;
 		}
 
-		private void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+		private void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
 			throws IOException, DMLRuntimeException
 		{
 			//temporary list of indexed matrix values to prevent library dependencies
-			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();					
+			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();
 			rbuff.flushBufferToBinaryBlocks(rettmp);
 			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
 		}
@@ -423,50 +406,17 @@ public class RDDConverterUtilsExt
 	/**
 	 * Convert a dataframe of comma-separated string rows to a dataframe of
 	 * ml.linalg.Vector rows.
-	 * 
-	 * <p>
-	 * Example input rows:<br>
-	 * 
-	 * <code>
-	 * ((1.2, 4.3, 3.4))<br>
-	 * (1.2, 3.4, 2.2)<br>
-	 * [[1.2, 34.3, 1.2, 1.25]]<br>
-	 * [1.2, 3.4]<br>
-	 * </code>
-	 * 
-	 * @param sqlContext
-	 *            Spark SQL Context
-	 * @param inputDF
-	 *            dataframe of comma-separated row strings to convert to
-	 *            dataframe of ml.linalg.Vector rows
-	 * @return dataframe of ml.linalg.Vector rows
-	 * @throws DMLRuntimeException
-	 *             if DMLRuntimeException occurs
-	 *             
-	 * @deprecated This will be removed in SystemML 1.0. Please migrate to {@code
-	 * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, Dataset<Row>) }
-	 */
-	@Deprecated
-	public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext sqlContext, Dataset<Row> inputDF)
-			throws DMLRuntimeException {
-		SparkSession sparkSession = sqlContext.sparkSession();
-		return stringDataFrameToVectorDataFrame(sparkSession, inputDF);
-	}
-	
-	/**
-	 * Convert a dataframe of comma-separated string rows to a dataframe of
-	 * ml.linalg.Vector rows.
-	 * 
+	 *
 	 * <p>
 	 * Example input rows:<br>
-	 * 
+	 *
 	 * <code>
 	 * ((1.2, 4.3, 3.4))<br>
 	 * (1.2, 3.4, 2.2)<br>
 	 * [[1.2, 34.3, 1.2, 1.25]]<br>
 	 * [1.2, 3.4]<br>
 	 * </code>
-	 * 
+	 *
 	 * @param sparkSession
 	 *            Spark Session
 	 * @param inputDF