You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/15 07:43:01 UTC

[1/3] incubator-systemml git commit: [SYSTEMML-913] Cache-conscious sparse matrix-vector multiplication

Repository: incubator-systemml
Updated Branches:
  refs/heads/master bcf431331 -> 085009a36


[SYSTEMML-913] Cache-conscious sparse matrix-vector multiplication

This patch makes the sparse-dense matrix-vector multiplication cache
conscious to handle cases with many features (and hence large rhs
vectors) more efficient. On singlenode experiments with 80GB max heap
and 100 iterations, example improvements were as follows:
(1) 100x10M, 0.1: 14.5s -> 10.1s
(2) 1Kx10M, 0.01: 29.5s -> 11.5s
(3) 10Kx10M, 0.001: 31.5s -> 21.5s
(4) 1Kx100M, 0.001: 33.8s -> 25.5s 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b304e605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b304e605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b304e605

Branch: refs/heads/master
Commit: b304e6058001ba195a0c9ddca0ebde24dfcdbdaa
Parents: bcf4313
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 14 23:30:53 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 15 09:41:25 2016 +0200

----------------------------------------------------------------------
 .../runtime/matrix/data/LibMatrixMult.java      | 36 ++++++++++++++++----
 1 file changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b304e605/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 9d0dd9b..48054ef 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -1321,24 +1321,48 @@ public class LibMatrixMult
 		final int m = m1.rlen;
 		final int n = m2.clen;
 		final int cd = m2.rlen;
+		final long xsp = (long)m*cd/m1.nonZeros;
 
 		if( LOW_LEVEL_OPTIMIZATION )
 		{
 			SparseBlock a = m1.sparseBlock;
 			
-			if( m==1 && n==1 )         //DOT PRODUCT
+			if( m==1 && n==1 )            //DOT PRODUCT
 			{
 				if( !a.isEmpty(0) ) {
 					c[0] = dotProduct(a.values(0), b, a.indexes(0), a.pos(0), 0, a.size(0));
 				}
 			}
-			else if( n==1 )            //MATRIX-VECTOR
+			else if( n==1 && cd<=2*1024 ) //MATRIX-VECTOR (short rhs)
 			{
 				for( int i=rl; i<ru; i++ )
 					if( !a.isEmpty(i) )
 						c[i] = dotProduct(a.values(i), b, a.indexes(i), a.pos(i), 0, a.size(i));							
 			}
-			else if( pm2 && m==1 )     //VECTOR-MATRIX
+			else if( n==1 )               //MATRIX-VECTOR (tall rhs)
+			{
+				final int blocksizeI = 32;
+				final int blocksizeK = (int)Math.max(2*1024,2*1024*xsp/32); //~ 16KB L1  
+				int[] curk = new int[blocksizeI];
+				
+				for( int bi = rl; bi < ru; bi+=blocksizeI ) {
+					Arrays.fill(curk, 0); //reset positions
+					for( int bk=0, bimin = Math.min(ru, bi+blocksizeI); bk<cd; bk+=blocksizeK ) {
+						for( int i=bi, bkmin = Math.min(bk+blocksizeK, cd); i<bimin; i++) {
+							if( a.isEmpty(i) ) continue;
+							int apos = a.pos(i);
+							int alen = a.size(i);
+							int[] aix = a.indexes(i);
+							double[] avals = a.values(i);					
+							int k = curk[i-bi] + apos;									
+							for( ; k<apos+alen && aix[k]<bkmin; k++ )
+								c[i] += avals[k] * b[aix[k]];
+							curk[i-bi] = k - apos;
+						}
+					}	
+				}
+			}
+			else if( pm2 && m==1 )        //VECTOR-MATRIX
 			{
 				//parallelization over rows in rhs matrix
 				if( !a.isEmpty(0) ) 
@@ -1357,7 +1381,7 @@ public class LibMatrixMult
 					}
 				}
 			}
-			else if( pm2 && m<=16 )    //MATRIX-MATRIX (short lhs) 
+			else if( pm2 && m<=16 )       //MATRIX-MATRIX (short lhs) 
 			{
 				int arlen = a.numRows();
 				for( int i=0, cix=0; i<arlen; i++, cix+=n )
@@ -1388,7 +1412,7 @@ public class LibMatrixMult
 		    			}
 					}
 			}
-			else if( n<=64 )           //MATRIX-MATRIX (skinny rhs)
+			else if( n<=64 )              //MATRIX-MATRIX (skinny rhs)
 			{
 				//no blocking since b and c fit into cache anyway
 				for( int i=rl, cix=rl*n; i<ru; i++, cix+=n ) {
@@ -1408,7 +1432,7 @@ public class LibMatrixMult
 	    					aix[k]*n, aix[k+1]*n, aix[k+2]*n, aix[k+3]*n, cix, n );
 				}	
 			}
-			else                       //MATRIX-MATRIX
+			else                          //MATRIX-MATRIX
 			{							
 				//blocksizes to fit blocks of B (dense) and several rows of A/C in common L2 cache size, 
 				//while blocking A/C for L1/L2 yet allowing long scans (2 pages) in the inner loop over j


[2/3] incubator-systemml git commit: [SYSTEMML-921] Fix function call error handling with missing inputs

Posted by mb...@apache.org.
[SYSTEMML-921] Fix function call error handling with missing inputs

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/00b06d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/00b06d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/00b06d47

Branch: refs/heads/master
Commit: 00b06d4723c037a9effdecec526d9b039db40c2d
Parents: b304e60
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Sep 15 01:34:21 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 15 09:41:29 2016 +0200

----------------------------------------------------------------------
 .../context/ExecutionContext.java               | 14 ++++----
 .../cp/FunctionCallCPInstruction.java           | 37 ++++++++++----------
 2 files changed, 26 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/00b06d47/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index dd20fd0..14b9c44 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -126,19 +126,19 @@ public class ExecutionContext
 	 * -------------------------------------------------------
 	 */
 	
-	public Data getVariable(String name) 
-	{
+	public Data getVariable(String name) {
 		return _variables.get(name);
 	}
 	
-	public void setVariable(String name, Data val) 
-		throws DMLRuntimeException
-	{
+	public void setVariable(String name, Data val) {
 		_variables.put(name, val);
 	}
+	
+	public boolean containsVariable(String name) {
+		return _variables.keySet().contains(name);
+	}
 
-	public Data removeVariable(String name) 
-	{
+	public Data removeVariable(String name) {
 		return _variables.remove(name);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/00b06d47/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
index 348eef2..529bd25 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.lops.Lop;
+import org.apache.sysml.parser.DMLProgram;
 import org.apache.sysml.parser.DataIdentifier;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -75,14 +76,12 @@ public class FunctionCallCPInstruction extends CPInstruction
 	}
 		
 	/**
-	 * Instruction format extFunct:::[FUNCTION NAME]:::[num input params]:::[num output params]:::[list of delimited input params ]:::[list of delimited ouput params]
-	 * These are the "bound names" for the inputs / outputs.  For example, out1 = foo(in1, in2) yields
-	 * extFunct:::foo:::2:::1:::in1:::in2:::out1
 	 * 
 	 */
 	public static FunctionCallCPInstruction parseInstruction(String str) 
 		throws DMLRuntimeException 
 	{	
+		//schema: extfunct, fname, num inputs, num outputs, inputs, outputs
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType ( str );
 		String namespace = parts[1];
 		String functionName = parts[2];
@@ -91,22 +90,20 @@ public class FunctionCallCPInstruction extends CPInstruction
 		ArrayList<CPOperand> boundInParamOperands = new ArrayList<CPOperand>();
 		ArrayList<String> boundInParamNames = new ArrayList<String>();
 		ArrayList<String> boundOutParamNames = new ArrayList<String>();
-		
-		int FIRST_PARAM_INDEX = 5;
 		for (int i = 0; i < numInputs; i++) {
-			CPOperand operand = new CPOperand(parts[FIRST_PARAM_INDEX + i]);
+			CPOperand operand = new CPOperand(parts[5 + i]);
 			boundInParamOperands.add(operand);
 			boundInParamNames.add(operand.getName());
 		}
 		for (int i = 0; i < numOutputs; i++) {
-			boundOutParamNames.add(parts[FIRST_PARAM_INDEX + numInputs + i]);
+			boundOutParamNames.add(parts[5 + numInputs + i]);
 		}
 		
-		return new FunctionCallCPInstruction ( namespace,functionName, boundInParamOperands, boundInParamNames, boundOutParamNames, str );
+		return new FunctionCallCPInstruction ( namespace,functionName, 
+				boundInParamOperands, boundInParamNames, boundOutParamNames, str );
 	}
 
-	
-	
+		
 	@Override
 	public Instruction preprocessInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 
@@ -144,8 +141,7 @@ public class FunctionCallCPInstruction extends CPInstruction
 			ValueType valType = fpb.getInputParams().get(i).getValueType();
 				
 			// CASE (a): default values, if call w/ less params than signature (scalars only)
-			if (   i > _boundInputParamNames.size() 
-				|| (!_boundInputParamOperands.get(i).isLiteral() && ec.getVariable(_boundInputParamNames.get(i)) == null))
+			if( i > _boundInputParamNames.size() )
 			{	
 				String defaultVal = fpb.getInputParams().get(i).getDefaultValue();
 				currFormalParamValue = ec.getScalarInput(defaultVal, valType, false);
@@ -153,12 +149,17 @@ public class FunctionCallCPInstruction extends CPInstruction
 			// CASE (b) literals or symbol table entries
 			else {
 				CPOperand operand = _boundInputParamOperands.get(i);
-				if( operand.getDataType()==DataType.SCALAR )
-					currFormalParamValue = ec.getScalarInput(operand.getName(), operand.getValueType(), operand.isLiteral());
-				else
-					currFormalParamValue = ec.getVariable(operand.getName());					
+				String varname = operand.getName();
+				//error handling non-existing variables
+				if( !operand.isLiteral() && ec.containsVariable(varname) ) {
+					throw new DMLRuntimeException("Input variable '"+varname+"' not existing on call of " + 
+							DMLProgram.constructFunctionKey(_namespace, _functionName) + " (line "+getLineNum()+").");
+				}
+				//get input matrix/frame/scalar
+				currFormalParamValue = (operand.getDataType()!=DataType.SCALAR) ? ec.getVariable(varname) : 
+					ec.getScalarInput(varname, operand.getValueType(), operand.isLiteral());
 			}
-				
+			
 			functionVariables.put(currFormalParamName,currFormalParamValue);						
 		}
 		
@@ -179,7 +180,7 @@ public class FunctionCallCPInstruction extends CPInstruction
 			throw e;
 		}
 		catch (Exception e){
-			String fname = this._namespace + "::" + this._functionName;
+			String fname = DMLProgram.constructFunctionKey(_namespace, _functionName);
 			throw new DMLRuntimeException("error executing function " + fname, e);
 		}
 		


[3/3] incubator-systemml git commit: [SYSTEMML-914][SYSTEMML-919] Rework dataframe-matrix converters, tests

Posted by mb...@apache.org.
[SYSTEMML-914][SYSTEMML-919] Rework dataframe-matrix converters, tests

The existing dataframe-matrix and matrix-dataframe converters had a
variety of issues. This patch completely overhauls these converters and
moves them into the well-tested RDDConverterUtils. In detail, this
includes the following fixes and improvements:

(1) Consolidation of redundant dataframe converters in
RDDConverterUtilsExt, MLContextConversionUtil, MLOutput.

(2) Missing block sizes and update of matrix characteristics after
dataframe conversion.

(3) Wrong matrix characteristics (clen) in case of row datasets with
contained ID column and unknown dimensions. This is critical as it leads
to incorrect results on writing and other operations.

(4) Various performance features: (a) No sorting by rowID but reuse
which also avoids zipwithindex, (b) significantly less object creation
and redundant data copies, (c) row shuffle as dense/sparse blocks
instead of bloated object arrays, (d) no double parsing via exceptions
per cell value, (e) dedicated handling of sparse/dense blocks, and (f)
avoided unnecessary double parsing on nnz analysis.

(5) General code quality (e.g., collapsed bloated code for simple
conjunctive predicates, removed commented experimental code, etc).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/085009a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/085009a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/085009a3

Branch: refs/heads/master
Commit: 085009a367519d437558b2004ac93d8b6ea60195
Parents: 00b06d4
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Sep 15 09:40:46 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 15 09:41:34 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    |   5 +-
 .../java/org/apache/sysml/api/MLOutput.java     | 277 +---------
 .../functions/ConvertSingleColumnToString.java  |  34 --
 .../api/ml/functions/ConvertVectorToDouble.java |  35 --
 .../api/mlcontext/MLContextConversionUtil.java  | 193 ++-----
 .../sysml/api/mlcontext/MLContextUtil.java      |  31 +-
 .../instructions/spark/WriteSPInstruction.java  |   3 +-
 .../spark/utils/RDDConverterUtils.java          | 313 +++++++++++
 .../spark/utils/RDDConverterUtilsExt.java       | 523 +------------------
 .../runtime/matrix/MatrixCharacteristics.java   |  10 +-
 .../sysml/runtime/util/DataConverter.java       |  27 +
 .../sysml/runtime/util/UtilFunctions.java       |  28 +-
 .../mlcontext/DataFrameConversionTest.java      | 196 +++++++
 .../functions/mlcontext/GNMFTest.java           |   3 +-
 14 files changed, 643 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
index c5588e2..1e415f1 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -76,7 +76,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFuncti
 import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -294,7 +294,8 @@ public class MLContext {
 	public void registerInput(String varName, DataFrame df, boolean containsID) throws DMLRuntimeException {
 		int blksz = ConfigurationManager.getBlocksize();
 		MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, blksz, blksz);
-		JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtilsExt.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, mcOut, containsID);
+		JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtils
+				.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, mcOut, containsID, false);
 		registerInput(varName, rdd, mcOut);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index f41c479..ac1b3f3 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -19,39 +19,24 @@
 
 package org.apache.sysml.api;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.mllib.linalg.DenseVector;
-import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.functions.GetMLBlock;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-import scala.Tuple2;
 
 /**
  * This is a simple container object that returns the output of execute from MLContext 
@@ -114,7 +99,7 @@ public class MLOutput {
 		JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName);
 		if(rdd != null) {
 			MatrixCharacteristics mc = _outMetadata.get(varName);
-			return RDDConverterUtilsExt.binaryBlockToDataFrame(rdd, mc, sqlContext);
+			return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, false);
 		}
 		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
 	}
@@ -135,7 +120,7 @@ public class MLOutput {
 			JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName);
 			if(rdd != null) {
 				MatrixCharacteristics mc = _outMetadata.get(varName);
-				return RDDConverterUtilsExt.binaryBlockToVectorDataFrame(rdd, mc, sqlContext);
+				return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, true);
 			}
 			throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
 		}
@@ -153,62 +138,25 @@ public class MLOutput {
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
-	public DataFrame getDF(SQLContext sqlContext, String varName, Map<String, Tuple2<Long, Long>> range) throws DMLRuntimeException {
-		if(sqlContext == null) {
+	public DataFrame getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) 
+		throws DMLRuntimeException 
+	{
+		if(sqlContext == null)
 			throw new DMLRuntimeException("SQLContext is not created.");
-		}
+			
 		JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName);
-		if(binaryBlockRDD == null) {
-			throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
-		}
-		MatrixCharacteristics mc = _outMetadata.get(varName);
-		long rlen = mc.getRows(); long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock();
-		
-		ArrayList<Tuple2<String, Tuple2<Long, Long>>> alRange = new ArrayList<Tuple2<String, Tuple2<Long, Long>>>();
-		for(Entry<String, Tuple2<Long, Long>> e : range.entrySet()) {
-			alRange.add(new Tuple2<String, Tuple2<Long,Long>>(e.getKey(), e.getValue()));
-		}
-		
-		// Very expensive operation here: groupByKey (where number of keys might be too large)
-		JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new ProjectRows(rlen, clen, brlen, bclen))
-				.groupByKey().map(new ConvertDoubleArrayToRangeRows(clen, bclen, alRange));
-
-		int numColumns = (int) clen;
-		if(numColumns <= 0) {
-			throw new DMLRuntimeException("Output dimensions unknown after executing the script and hence cannot create the dataframe");
-		}
-		
-		List<StructField> fields = new ArrayList<StructField>();
-		// LongTypes throw an error: java.lang.Double incompatible with java.lang.Long
-		fields.add(DataTypes.createStructField("__INDEX", DataTypes.DoubleType, false));
-		for(int k = 0; k < alRange.size(); k++) {
-			String colName = alRange.get(k)._1;
-			long low = alRange.get(k)._2._1;
-			long high = alRange.get(k)._2._2;
-			if(low != high)
-				fields.add(DataTypes.createStructField(colName, new VectorUDT(), false));
-			else
-				fields.add(DataTypes.createStructField(colName, DataTypes.DoubleType, false));
-		}
-		
-		// This will cause infinite recursion due to bug in Spark
-		// https://issues.apache.org/jira/browse/SPARK-6999
-		// return sqlContext.createDataFrame(rowsRDD, colNames); // where ArrayList<String> colNames
-		return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
-		
+		return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, binaryBlockRDD, mc, true);
 	}
 	
 	public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException {
 		if(format.equals("text")) {
 			JavaPairRDD<MatrixIndexes, MatrixBlock> binaryRDD = getBinaryBlockedRDD(varName);
 			MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); 
-			return RDDConverterUtilsExt.binaryBlockToStringRDD(binaryRDD, mcIn, format);
+			return RDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcIn);
 		}
 		else {
 			throw new DMLRuntimeException("The output format:" + format + " is not implemented yet.");
 		}
-		
 	}
 	
 	public JavaRDD<String> getStringFrameRDD(String varName, String format, CSVFileFormatProperties fprop ) throws DMLRuntimeException {
@@ -247,209 +195,4 @@ public class MLOutput {
 		}
 		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
 	}
-	
-//	/**
-//	 * Experimental: Please use this with caution as it will fail in many corner cases.
-//	 * @return org.apache.spark.mllib.linalg.distributed.BlockMatrix
-//	 * @throws DMLRuntimeException 
-//	 */
-//	public BlockMatrix getMLLibBlockedMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {
-//		return getMLMatrix(ml, sqlContext, varName).toBlockedMatrix();
-//	}
-	
-	public static class ProjectRows implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, Long, Tuple2<Long, Double[]>> {
-		private static final long serialVersionUID = -4792573268900472749L;
-		long rlen; long clen;
-		int brlen; int bclen;
-		public ProjectRows(long rlen, long clen, int brlen, int bclen) {
-			this.rlen = rlen;
-			this.clen = clen;
-			this.brlen = brlen;
-			this.bclen = bclen;
-		}
-
-		@Override
-		public Iterable<Tuple2<Long, Tuple2<Long, Double[]>>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
-			// ------------------------------------------------------------------
-    		//	Compute local block size: 
-    		// Example: For matrix: 1500 X 1100 with block length 1000 X 1000
-    		// We will have four local block sizes (1000X1000, 1000X100, 500X1000 and 500X1000)
-    		long blockRowIndex = kv._1.getRowIndex();
-    		long blockColIndex = kv._1.getColumnIndex();
-    		int lrlen = UtilFunctions.computeBlockSize(rlen, blockRowIndex, brlen);
-    		int lclen = UtilFunctions.computeBlockSize(clen, blockColIndex, bclen);
-    		// ------------------------------------------------------------------
-			
-			long startRowIndex = (kv._1.getRowIndex()-1) * bclen + 1;
-			MatrixBlock blk = kv._2;
-			ArrayList<Tuple2<Long, Tuple2<Long, Double[]>>> retVal = new ArrayList<Tuple2<Long,Tuple2<Long,Double[]>>>();
-			for(int i = 0; i < lrlen; i++) {
-				Double[] partialRow = new Double[lclen];
-				for(int j = 0; j < lclen; j++) {
-					partialRow[j] = blk.getValue(i, j);
-				}
-				retVal.add(new Tuple2<Long, Tuple2<Long,Double[]>>(startRowIndex + i, new Tuple2<Long,Double[]>(kv._1.getColumnIndex(), partialRow)));
-			}
-			return retVal;
-		}
-	}
-	
-	public static class ConvertDoubleArrayToRows implements Function<Tuple2<Long, Iterable<Tuple2<Long, Double[]>>>, Row> {
-		private static final long serialVersionUID = 4441184411670316972L;
-		
-		int bclen; long clen;
-		boolean outputVector;
-		public ConvertDoubleArrayToRows(long clen, int bclen, boolean outputVector) {
-			this.bclen = bclen;
-			this.clen = clen;
-			this.outputVector = outputVector;
-		}
-
-		@Override
-		public Row call(Tuple2<Long, Iterable<Tuple2<Long, Double[]>>> arg0)
-				throws Exception {
-			
-			HashMap<Long, Double[]> partialRows = new HashMap<Long, Double[]>();
-			int sizeOfPartialRows = 0;
-			for(Tuple2<Long, Double[]> kv : arg0._2) {
-				partialRows.put(kv._1, kv._2);
-				sizeOfPartialRows += kv._2.length;
-			}
-			
-			// Insert first row as row index
-			Object[] row = null;
-			if(outputVector) {
-				row = new Object[2];
-				double [] vecVals = new double[sizeOfPartialRows];
-				
-				for(long columnBlockIndex = 1; columnBlockIndex <= partialRows.size(); columnBlockIndex++) {
-					if(partialRows.containsKey(columnBlockIndex)) {
-						Double [] array = partialRows.get(columnBlockIndex);
-						// ------------------------------------------------------------------
-						//	Compute local block size: 
-						int lclen = UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-						// ------------------------------------------------------------------
-						if(array.length != lclen) {
-							throw new Exception("Incorrect double array provided by ProjectRows");
-						}
-						for(int i = 0; i < lclen; i++) {
-							vecVals[(int) ((columnBlockIndex-1)*bclen + i)] = array[i];
-						}
-					}
-					else {
-						throw new Exception("The block for column index " + columnBlockIndex + " is missing. Make sure the last instruction is not returning empty blocks");
-					}
-				}
-				
-				long rowIndex = arg0._1;
-				row[0] = (double) rowIndex;
-				row[1] = new DenseVector(vecVals); // breeze.util.JavaArrayOps.arrayDToDv(vecVals);
-			}
-			else {
-				row = new Double[sizeOfPartialRows + 1];
-				long rowIndex = arg0._1;
-				row[0] = (double) rowIndex;
-				for(long columnBlockIndex = 1; columnBlockIndex <= partialRows.size(); columnBlockIndex++) {
-					if(partialRows.containsKey(columnBlockIndex)) {
-						Double [] array = partialRows.get(columnBlockIndex);
-						// ------------------------------------------------------------------
-						//	Compute local block size: 
-						int lclen = UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-						// ------------------------------------------------------------------
-						if(array.length != lclen) {
-							throw new Exception("Incorrect double array provided by ProjectRows");
-						}
-						for(int i = 0; i < lclen; i++) {
-							row[(int) ((columnBlockIndex-1)*bclen + i) + 1] = array[i];
-						}
-					}
-					else {
-						throw new Exception("The block for column index " + columnBlockIndex + " is missing. Make sure the last instruction is not returning empty blocks");
-					}
-				}
-			}
-			Object[] row_fields = row;
-			return RowFactory.create(row_fields);
-		}
-	}
-	
-	
-	public static class ConvertDoubleArrayToRangeRows implements Function<Tuple2<Long, Iterable<Tuple2<Long, Double[]>>>, Row> {
-		private static final long serialVersionUID = 4441184411670316972L;
-		
-		int bclen; long clen;
-		ArrayList<Tuple2<String, Tuple2<Long, Long>>> range;
-		public ConvertDoubleArrayToRangeRows(long clen, int bclen, ArrayList<Tuple2<String, Tuple2<Long, Long>>> range) {
-			this.bclen = bclen;
-			this.clen = clen;
-			this.range = range;
-		}
-
-		@Override
-		public Row call(Tuple2<Long, Iterable<Tuple2<Long, Double[]>>> arg0)
-				throws Exception {
-			
-			HashMap<Long, Double[]> partialRows = new HashMap<Long, Double[]>();
-			int sizeOfPartialRows = 0;
-			for(Tuple2<Long, Double[]> kv : arg0._2) {
-				partialRows.put(kv._1, kv._2);
-				sizeOfPartialRows += kv._2.length;
-			}
-			
-			// Insert first row as row index
-			Object[] row = new Object[range.size() + 1];
-			
-			double [] vecVals = new double[sizeOfPartialRows];
-			
-			for(long columnBlockIndex = 1; columnBlockIndex <= partialRows.size(); columnBlockIndex++) {
-				if(partialRows.containsKey(columnBlockIndex)) {
-					Double [] array = partialRows.get(columnBlockIndex);
-					// ------------------------------------------------------------------
-					//	Compute local block size: 
-					int lclen = UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-					// ------------------------------------------------------------------
-					if(array.length != lclen) {
-						throw new Exception("Incorrect double array provided by ProjectRows");
-					}
-					for(int i = 0; i < lclen; i++) {
-						vecVals[(int) ((columnBlockIndex-1)*bclen + i)] = array[i];
-					}
-				}
-				else {
-					throw new Exception("The block for column index " + columnBlockIndex + " is missing. Make sure the last instruction is not returning empty blocks");
-				}
-			}
-			
-			long rowIndex = arg0._1;
-			row[0] = (double) rowIndex;
-			
-			int i = 1;
-			
-			//for(Entry<String, Tuple2<Long, Long>> e : range.entrySet()) {
-			for(int k = 0; k < range.size(); k++) {
-				long low = range.get(k)._2._1;
-				long high = range.get(k)._2._2;
-				
-				if(high < low) {
-					throw new Exception("Incorrect range:" + high + "<" + low);
-				}
-				
-				if(low == high) {
-					row[i] = vecVals[(int) (low - 1)];
-				}
-				else {
-					int lengthOfVector = (int) (high - low + 1);
-					double [] tempVector = new double[lengthOfVector];
-					for(int j = 0; j < lengthOfVector; j++) {
-						tempVector[j] = vecVals[(int) (low + j - 1)];
-					}
-					row[i] = new DenseVector(tempVector);
-				}
-				
-				i++;
-			}
-
-			return RowFactory.create(row);
-		}
-	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java b/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
deleted file mode 100644
index 1572e41..0000000
--- a/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.api.ml.functions;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-public class ConvertSingleColumnToString implements Function<Row, String> {
-
-	private static final long serialVersionUID = -499763403738768970L;
-
-	@Override
-	public String call(Row row) throws Exception {
-		return row.apply(0).toString();
-	}
-}
-	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java b/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
deleted file mode 100644
index f24f171..0000000
--- a/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.api.ml.functions;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-public class ConvertVectorToDouble implements Function<Row, Double> {
-
-	private static final long serialVersionUID = -6612447783777073929L;
-
-	@Override
-	public Double call(Row row) throws Exception {
-		
-		return row.getDouble(0);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 15aa15e..0661a93 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -29,7 +29,6 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.spark.Accumulator;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -54,10 +53,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.ConvertStringToLong
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.DataFrameAnalysisFunction;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.DataFrameToBinaryBlockFunction;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -67,7 +63,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 import scala.collection.JavaConversions;
 import scala.reflect.ClassTag;
@@ -196,14 +191,10 @@ public class MLContextConversionUtil {
 	public static FrameObject frameBlockToFrameObject(String variableName, FrameBlock frameBlock,
 			FrameMetadata frameMetadata) {
 		try {
-			MatrixCharacteristics matrixCharacteristics;
-			if (frameMetadata != null) {
-				matrixCharacteristics = frameMetadata.asMatrixCharacteristics();
-			} else {
-				matrixCharacteristics = new MatrixCharacteristics();
-			}
-			MatrixFormatMetaData mtd = new MatrixFormatMetaData(matrixCharacteristics, OutputInfo.BinaryBlockOutputInfo,
-					InputInfo.BinaryBlockInputInfo);
+			MatrixCharacteristics mc = (frameMetadata != null) ? 
+					frameMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
+			MatrixFormatMetaData mtd = new MatrixFormatMetaData(mc, 
+					OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo);
 			FrameObject frameObject = new FrameObject(OptimizerUtils.getUniqueTempFileName(), mtd);
 			frameObject.acquireModify(frameBlock);
 			frameObject.release();
@@ -325,15 +316,11 @@ public class MLContextConversionUtil {
 	 *         {@code MatrixObject}
 	 */
 	public static MatrixObject dataFrameToMatrixObject(String variableName, DataFrame dataFrame,
-			MatrixMetadata matrixMetadata) {
-		if (matrixMetadata == null) {
-			matrixMetadata = new MatrixMetadata();
-		}
-		JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = MLContextConversionUtil
-				.dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata);
-		MatrixObject matrixObject = MLContextConversionUtil.binaryBlocksToMatrixObject(variableName, binaryBlock,
-				matrixMetadata);
-		return matrixObject;
+		MatrixMetadata matrixMetadata) 
+	{
+		matrixMetadata = (matrixMetadata!=null) ? matrixMetadata : new MatrixMetadata();
+		JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata);
+		return binaryBlocksToMatrixObject(variableName, binaryBlock, matrixMetadata);
 	}
 
 	/**
@@ -417,38 +404,27 @@ public class MLContextConversionUtil {
 	 *         {@code JavaPairRDD<MatrixIndexes,
 	 *         MatrixBlock>} binary-block matrix
 	 */
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(DataFrame dataFrame,
-			MatrixMetadata matrixMetadata) {
-
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(
+			DataFrame dataFrame, MatrixMetadata matrixMetadata) 
+	{
+		//handle meta data
 		determineMatrixFormatIfNeeded(dataFrame, matrixMetadata);
-
-		MatrixCharacteristics matrixCharacteristics;
-		if (matrixMetadata != null) {
-			matrixCharacteristics = matrixMetadata.asMatrixCharacteristics();
-			if (matrixCharacteristics == null) {
-				matrixCharacteristics = new MatrixCharacteristics();
-			}
-		} else {
-			matrixCharacteristics = new MatrixCharacteristics();
-		}
-
-		if (isDataFrameWithIDColumn(matrixMetadata)) {
-			dataFrame = dataFrame.sort("__INDEX").drop("__INDEX");
-		}
-
-		boolean isVectorBasedDataFrame = isVectorBasedDataFrame(matrixMetadata);
-
-		determineDataFrameDimensionsIfNeeded(dataFrame, matrixCharacteristics, isVectorBasedDataFrame);
-		if (matrixMetadata != null) {
-			// so external reference can be updated with the metadata
-			matrixMetadata.setMatrixCharacteristics(matrixCharacteristics);
-		}
-
-		JavaRDD<Row> javaRDD = dataFrame.javaRDD();
-		JavaPairRDD<Row, Long> prepinput = javaRDD.zipWithIndex();
-		JavaPairRDD<MatrixIndexes, MatrixBlock> out = prepinput
-				.mapPartitionsToPair(new DataFrameToBinaryBlockFunction(matrixCharacteristics, isVectorBasedDataFrame));
-		out = RDDAggregateUtils.mergeByKey(out);
+		MatrixCharacteristics mc = (matrixMetadata != null && matrixMetadata.asMatrixCharacteristics()!=null) ?
+				matrixMetadata.asMatrixCharacteristics() : new MatrixCharacteristics();
+		boolean containsID = isDataFrameWithIDColumn(matrixMetadata);
+		boolean isVector = isVectorBasedDataFrame(matrixMetadata);
+	
+		//get spark context
+		JavaSparkContext sc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
+
+		//convert data frame to binary block matrix
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils
+				.dataFrameToBinaryBlock(sc, dataFrame, mc, containsID, isVector);
+		
+		//update determined matrix characteristics
+		if( matrixMetadata != null )
+			matrixMetadata.setMatrixCharacteristics(mc);
+		
 		return out;
 	}
 
@@ -486,7 +462,7 @@ public class MLContextConversionUtil {
 		StructType schema = dataFrame.schema();
 		boolean hasID = false;
 		try {
-			schema.fieldIndex("__INDEX");
+			schema.fieldIndex(RDDConverterUtils.DF_ID_COLUMN);
 			hasID = true;
 		} catch (IllegalArgumentException iae) {
 		}
@@ -522,14 +498,9 @@ public class MLContextConversionUtil {
 	 *         otherwise.
 	 */
 	public static boolean isDataFrameWithIDColumn(MatrixMetadata matrixMetadata) {
-		if (matrixMetadata == null) {
-			return false;
-		}
-		MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
-		if (matrixFormat == null) {
-			return false;
-		}
-		return matrixFormat.hasIDColumn();
+		return (matrixMetadata != null 
+			&& matrixMetadata.getMatrixFormat() != null
+			&& matrixMetadata.getMatrixFormat().hasIDColumn());
 	}
 
 	/**
@@ -541,14 +512,9 @@ public class MLContextConversionUtil {
 	 *         otherwise.
 	 */
 	public static boolean isDataFrameWithIDColumn(FrameMetadata frameMetadata) {
-		if (frameMetadata == null) {
-			return false;
-		}
-		FrameFormat frameFormat = frameMetadata.getFrameFormat();
-		if (frameFormat == null) {
-			return false;
-		}
-		return frameFormat.hasIDColumn();
+		return (frameMetadata != null 
+			&& frameMetadata.getFrameFormat() != null
+			&& frameMetadata.getFrameFormat().hasIDColumn());
 	}
 
 	/**
@@ -560,51 +526,9 @@ public class MLContextConversionUtil {
 	 *         otherwise.
 	 */
 	public static boolean isVectorBasedDataFrame(MatrixMetadata matrixMetadata) {
-		if (matrixMetadata == null) {
-			return false;
-		}
-		MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
-		if (matrixFormat == null) {
-			return false;
-		}
-		return matrixFormat.isVectorBased();
-	}
-
-	/**
-	 * If the {@code DataFrame} dimensions aren't present in the
-	 * {@code MatrixCharacteristics} metadata, determine the dimensions and
-	 * place them in the {@code MatrixCharacteristics} metadata.
-	 * 
-	 * @param dataFrame
-	 *            the Spark {@code DataFrame}
-	 * @param matrixCharacteristics
-	 *            the matrix metadata
-	 * @param vectorBased
-	 *            is the DataFrame vector-based
-	 */
-	public static void determineDataFrameDimensionsIfNeeded(DataFrame dataFrame,
-			MatrixCharacteristics matrixCharacteristics, boolean vectorBased) {
-		if (!matrixCharacteristics.dimsKnown(true)) {
-			MLContext activeMLContext = (MLContext) MLContextProxy.getActiveMLContextForAPI();
-			SparkContext sparkContext = activeMLContext.getSparkContext();
-			@SuppressWarnings("resource")
-			JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
-
-			Accumulator<Double> aNnz = javaSparkContext.accumulator(0L);
-			JavaRDD<Row> javaRDD = dataFrame.javaRDD().map(new DataFrameAnalysisFunction(aNnz, vectorBased));
-			long numRows = javaRDD.count();
-			long numColumns;
-			if (vectorBased) {
-				Vector v = (Vector) javaRDD.first().get(0);
-				numColumns = v.size();
-			} else {
-				numColumns = dataFrame.columns().length;
-			}
-
-			long numNonZeros = UtilFunctions.toLong(aNnz.value());
-			matrixCharacteristics.set(numRows, numColumns, matrixCharacteristics.getRowsPerBlock(),
-					matrixCharacteristics.getColsPerBlock(), numNonZeros);
-		}
+		return (matrixMetadata != null 
+			&& matrixMetadata.getMatrixFormat() != null
+			&& matrixMetadata.getMatrixFormat().isVectorBased());
 	}
 
 	/**
@@ -864,14 +788,8 @@ public class MLContextConversionUtil {
 	 */
 	public static JavaRDD<String> binaryBlockMatrixToJavaRDDStringIJV(BinaryBlockMatrix binaryBlockMatrix) {
 		JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = binaryBlockMatrix.getBinaryBlocks();
-		MatrixCharacteristics matrixCharacteristics = binaryBlockMatrix.getMatrixCharacteristics();
-		try {
-			JavaRDD<String> javaRDDString = RDDConverterUtilsExt.binaryBlockToStringRDD(binaryBlock,
-					matrixCharacteristics, "text");
-			return javaRDDString;
-		} catch (DMLRuntimeException e) {
-			throw new MLContextException("Exception converting BinaryBlockMatrix to JavaRDD<String> (ijv)", e);
-		}
+		MatrixCharacteristics mc = binaryBlockMatrix.getMatrixCharacteristics();
+		return RDDConverterUtils.binaryBlockToTextCell(binaryBlock, mc);
 	}
 
 	/**
@@ -1285,21 +1203,14 @@ public class MLContextConversionUtil {
 			@SuppressWarnings("unchecked")
 			JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockMatrix = (JavaPairRDD<MatrixIndexes, MatrixBlock>) sparkExecutionContext
 					.getRDDHandleForMatrixObject(matrixObject, InputInfo.BinaryBlockInputInfo);
-			MatrixCharacteristics matrixCharacteristics = matrixObject.getMatrixCharacteristics();
+			MatrixCharacteristics mc = matrixObject.getMatrixCharacteristics();
 
-			MLContext activeMLContext = (MLContext) MLContextProxy.getActiveMLContextForAPI();
-			SparkContext sc = activeMLContext.getSparkContext();
-			SQLContext sqlContext = new SQLContext(sc);
-			DataFrame df = null;
-			if (isVectorDF) {
-				df = RDDConverterUtilsExt.binaryBlockToVectorDataFrame(binaryBlockMatrix, matrixCharacteristics,
-						sqlContext);
-			} else {
-				df = RDDConverterUtilsExt.binaryBlockToDataFrame(binaryBlockMatrix, matrixCharacteristics, sqlContext);
-			}
-
-			return df;
-		} catch (DMLRuntimeException e) {
+			SparkContext sc = ((MLContext) MLContextProxy.getActiveMLContextForAPI()).getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			return RDDConverterUtils.binaryBlockToDataFrame(sqlctx, binaryBlockMatrix, mc, isVectorDF);			
+		} 
+		catch (DMLRuntimeException e) {
 			throw new MLContextException("DMLRuntimeException while converting matrix object to DataFrame", e);
 		}
 	}
@@ -1321,11 +1232,11 @@ public class MLContextConversionUtil {
 					.getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo);
 			MatrixCharacteristics matrixCharacteristics = frameObject.getMatrixCharacteristics();
 
-			JavaSparkContext jsc = MLContextUtil
-					.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
+			JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
 
 			return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, matrixCharacteristics, jsc);
-		} catch (DMLRuntimeException e) {
+		} 
+		catch (DMLRuntimeException e) {
 			throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 5b4e736..6c75048 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -976,18 +976,9 @@ public final class MLContextUtil {
 	 *         FrameObject, {@code false} otherwise.
 	 */
 	public static boolean doesSymbolTableContainFrameObject(LocalVariableMap symbolTable, String variableName) {
-		if (symbolTable == null) {
-			return false;
-		}
-		Data data = symbolTable.get(variableName);
-		if (data == null) {
-			return false;
-		}
-		if (data instanceof FrameObject) {
-			return true;
-		} else {
-			return false;
-		}
+		return (symbolTable != null
+			&& symbolTable.keySet().contains(variableName)
+			&& symbolTable.get(variableName) instanceof FrameObject);
 	}
 
 	/**
@@ -1002,18 +993,8 @@ public final class MLContextUtil {
 	 *         MatrixObject, {@code false} otherwise.
 	 */
 	public static boolean doesSymbolTableContainMatrixObject(LocalVariableMap symbolTable, String variableName) {
-		if (symbolTable == null) {
-			return false;
-		}
-		Data data = symbolTable.get(variableName);
-		if (data == null) {
-			return false;
-		}
-		if (data instanceof MatrixObject) {
-			return true;
-		} else {
-			return false;
-		}
+		return (symbolTable != null
+			&& symbolTable.keySet().contains(variableName)
+			&& symbolTable.get(variableName) instanceof MatrixObject);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 0cfec66..e4e2606 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -36,7 +36,6 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
-import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -192,7 +191,7 @@ public class WriteSPInstruction extends SPInstruction
 				header = sec.getSparkContext().parallelize(headerContainer);
 			}
 			
-			JavaRDD<String> ijv = in1.flatMap(new ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock()));
+			JavaRDD<String> ijv = RDDConverterUtils.binaryBlockToTextCell(in1, mc);
 			if(header != null)
 				customSaveTextFile(header.union(ijv), fname, true);
 			else

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3998a3f..17bdea8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -34,14 +35,25 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.DenseVector;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
 
 import scala.Tuple2;
 
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
+import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -57,6 +69,8 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class RDDConverterUtils 
 {
+	public static final String DF_ID_COLUMN = "__INDEX";
+	
 	/**
 	 * 
 	 * @param sc
@@ -135,6 +149,17 @@ public class RDDConverterUtils
 	/**
 	 * 
 	 * @param in
+	 * @param mc
+	 * @return
+	 */
+	public static JavaRDD<String> binaryBlockToTextCell(JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc) {
+		return in.flatMap(new ConvertMatrixBlockToIJVLines(
+				mc.getRowsPerBlock(), mc.getColsPerBlock()));
+	}
+	
+	/**
+	 * 
+	 * @param in
 	 * @param mcIn
 	 * @param props
 	 * @param strict
@@ -244,6 +269,85 @@ public class RDDConverterUtils
 	
 	/**
 	 * 
+	 * @param sc
+	 * @param df
+	 * @param mcOut
+	 * @param containsID
+	 * @param isVector
+	 * @param columns
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
+			DataFrame df, MatrixCharacteristics mc, boolean containsID, boolean isVector) 
+	{
+		//determine unknown dimensions and sparsity if required
+		if( !mc.dimsKnown(true) ) {
+			Accumulator<Double> aNnz = sc.accumulator(0L);
+			JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, containsID, isVector));
+			long rlen = tmp.count();
+			long clen = !isVector ? df.columns().length - (containsID?1:0) : 
+					((Vector) tmp.first().get(containsID?1:0)).size();
+			long nnz = UtilFunctions.toLong(aNnz.value());
+			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), nnz);
+		}
+		
+		//ensure valid blocksizes
+		if( mc.getRowsPerBlock()<=1 || mc.getColsPerBlock()<=1 ) {
+			mc.setBlockSize(ConfigurationManager.getBlocksize());
+		}
+		
+		JavaPairRDD<Row, Long> prepinput = containsID ?
+				df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
+				df.javaRDD().zipWithIndex(); //zip row index
+		
+		//convert csv rdd to binary block rdd (w/ partial blocks)
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
+				prepinput.mapPartitionsToPair(
+					new DataFrameToBinaryBlockFunction(mc, containsID, isVector));
+		
+		//aggregate partial matrix blocks
+		out = RDDAggregateUtils.mergeByKey( out ); 
+		
+		return out;
+	}
+	
+	/**
+	 * 
+	 * @param sqlContext
+	 * @param in
+	 * @param mc
+	 * @param toVector
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext, 
+			JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)  
+	{
+		if( !mc.colsKnown() )
+			throw new RuntimeException("Number of columns needed to convert binary block to data frame.");
+		
+		//slice blocks into rows, align and convert into data frame rows
+		JavaRDD<Row> rowsRDD = in
+			.flatMapToPair(new SliceBinaryBlockToRowsFunction(mc.getRowsPerBlock()))
+			.groupByKey().map(new ConvertRowBlocksToRows((int)mc.getCols(), mc.getColsPerBlock(), toVector));
+		
+		//create data frame schema
+		List<StructField> fields = new ArrayList<StructField>();
+		fields.add(DataTypes.createStructField(DF_ID_COLUMN, DataTypes.DoubleType, false));
+		if( toVector )
+			fields.add(DataTypes.createStructField("C1", new VectorUDT(), false));
+		else { // row
+			for(int i = 1; i <= mc.getCols(); i++)
+				fields.add(DataTypes.createStructField("C"+i, DataTypes.DoubleType, false));
+		}
+		
+		//rdd to data frame conversion
+		return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
+	}
+	
+	/**
+	 * 
 	 * @param in
 	 * @return
 	 */
@@ -751,4 +855,213 @@ public class RDDConverterUtils
 			return new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(rowIndex, 1),out);
 		}		
 	}
+
+	/////////////////////////////////
+	// DATAFRAME-SPECIFIC FUNCTIONS
+
+	/**
+	 * 
+	 */
+	private static class DataFrameToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,MatrixIndexes,MatrixBlock> 
+	{
+		private static final long serialVersionUID = 653447740362447236L;
+		
+		private long _rlen = -1;
+		private long _clen = -1;
+		private int _brlen = -1;
+		private int _bclen = -1;
+		private boolean _sparse = false;
+		private boolean _containsID;
+		private boolean _isVector;
+		
+		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean containsID, boolean isVector) {
+			_rlen = mc.getRows();
+			_clen = mc.getCols();
+			_brlen = mc.getRowsPerBlock();
+			_bclen = mc.getColsPerBlock();
+			_sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(
+					mc.getRows(), mc.getCols(), mc.getNonZeros());
+			_containsID = containsID;
+			_isVector = isVector;
+		}
+		
+		@Override
+		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<Row, Long>> arg0) 
+			throws Exception 
+		{
+			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+			
+			int ncblks = (int)Math.ceil((double)_clen/_bclen);
+			MatrixIndexes[] ix = new MatrixIndexes[ncblks];
+			MatrixBlock[] mb = new MatrixBlock[ncblks];
+			
+			while( arg0.hasNext() )
+			{
+				Tuple2<Row,Long> tmp = arg0.next();
+				long rowix = tmp._2() + 1;
+				
+				long rix = UtilFunctions.computeBlockIndex(rowix, _brlen);
+				int pos = UtilFunctions.computeCellInBlock(rowix, _brlen);
+			
+				//create new blocks for entire row
+				if( ix[0] == null || ix[0].getRowIndex() != rix ) {
+					if( ix[0] !=null )
+						flushBlocksToList(ix, mb, ret);
+					long len = UtilFunctions.computeBlockSize(_rlen, rix, _brlen);
+					createBlocks(rowix, (int)len, ix, mb);
+				}
+				
+				//process row data
+				int off = _containsID ? 1: 0;
+				if( _isVector ) {
+					Vector vect = (Vector) tmp._1().get(off);
+					for( int cix=1, pix=0; cix<=ncblks; cix++ ) {
+						int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+						for( int j=0; j<lclen; j++ )
+							mb[cix-1].appendValue(pos, j, vect.apply(pix++));
+					}	
+				}
+				else { //row
+					Row row = tmp._1();
+					for( int cix=1, pix=off; cix<=ncblks; cix++ ) {
+						int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+						for( int j=0; j<lclen; j++ )
+							mb[cix-1].appendValue(pos, j, UtilFunctions.getDouble(row.get(pix++)));
+					}
+				}
+			}
+		
+			//flush last blocks
+			flushBlocksToList(ix, mb, ret);
+		
+			return ret;		
+		}
+		
+		// Creates new state of empty column blocks for current global row index.
+		private void createBlocks(long rowix, int lrlen, MatrixIndexes[] ix, MatrixBlock[] mb)
+		{
+			//compute row block index and number of column blocks
+			long rix = UtilFunctions.computeBlockIndex(rowix, _brlen);
+			int ncblks = (int)Math.ceil((double)_clen/_bclen);
+			
+			//create all column blocks (assume dense since csv is dense text format)
+			for( int cix=1; cix<=ncblks; cix++ ) {
+				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+				ix[cix-1] = new MatrixIndexes(rix, cix);
+				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse);		
+			}
+		}
+		
+		// Flushes current state of filled column blocks to output list.
+		private void flushBlocksToList( MatrixIndexes[] ix, MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+			throws DMLRuntimeException
+		{
+			int len = ix.length;			
+			for( int i=0; i<len; i++ )
+				if( mb[i] != null ) {
+					ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i]));
+					mb[i].examSparsity(); //ensure right representation
+				}	
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	private static class DataFrameAnalysisFunction implements Function<Row,Row>  
+	{	
+		private static final long serialVersionUID = 5705371332119770215L;
+		
+		private Accumulator<Double> _aNnz = null;
+		private boolean _containsID;
+		private boolean _isVector;
+		
+		public DataFrameAnalysisFunction( Accumulator<Double> aNnz, boolean containsID, boolean isVector) {
+			_aNnz = aNnz;
+			_containsID = containsID;
+			_isVector = isVector;
+		}
+
+		@Override
+		public Row call(Row arg0) throws Exception {
+			//determine number of non-zeros of row
+			int off = _containsID ? 1 : 0;
+			long lnnz = 0;
+			if( _isVector ) {
+				//note: numNonzeros scans entries but handles sparse/dense
+				Vector vec = (Vector) arg0.get(off);
+				lnnz += vec.numNonzeros();
+			}
+			else { //row
+				for(int i=off; i<arg0.length(); i++)
+					lnnz += UtilFunctions.isNonZero(arg0.get(i)) ? 1 : 0;
+			}
+		
+			//update counters
+			_aNnz.add( (double)lnnz );
+			return arg0;
+		}
+	}
+
+	/**
+	 * 
+	 */
+	private static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long> 
+	{
+		private static final long serialVersionUID = 7438855241666363433L;
+
+		@Override
+		public Tuple2<Row, Long> call(Row arg0) throws Exception {
+			//extract 1-based IDs and convert to 0-based positions
+			long id = UtilFunctions.toLong(UtilFunctions.getDouble(arg0.get(0)));
+			return new Tuple2<Row,Long>(arg0, id-1);
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	private static class ConvertRowBlocksToRows implements Function<Tuple2<Long, Iterable<Tuple2<Long, MatrixBlock>>>, Row> {
+		
+		private static final long serialVersionUID = 4441184411670316972L;
+		
+		private int _clen;
+		private int _bclen;
+		private boolean _toVector;
+		
+		public ConvertRowBlocksToRows(int clen, int bclen, boolean toVector) {
+			_clen = clen;
+			_bclen = bclen;
+			_toVector = toVector;
+		}
+
+		@Override
+		public Row call(Tuple2<Long, Iterable<Tuple2<Long, MatrixBlock>>> arg0)
+			throws Exception 
+		{
+			Object[] row = new Object[_toVector ? 2 : _clen+1];
+			row[0] = (double) arg0._1(); //row index
+			
+			//copy block data into target row
+			if( _toVector ) {
+				double[] tmp = new double[_clen];
+				for(Tuple2<Long, MatrixBlock> kv : arg0._2()) {
+					int cl = (kv._1().intValue()-1)*_bclen;
+					MatrixBlock mb = kv._2();
+					DataConverter.copyToDoubleVector(mb, tmp, cl);
+				}
+				row[1] = new DenseVector(tmp);
+			}
+			else {
+				for(Tuple2<Long, MatrixBlock> kv : arg0._2()) {
+					int cl = (kv._1().intValue()-1)*_bclen;
+					MatrixBlock mb = kv._2();
+					for( int j=0; j<mb.getNumColumns(); j++ )
+						row[cl+j+1] = mb.quickGetValue(0, j);
+				}
+			}
+			
+			return RowFactory.create(row);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 34e5a91..641285f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -52,8 +52,6 @@ import java.nio.ByteOrder;
 
 import scala.Tuple2;
 
-import org.apache.sysml.api.MLOutput.ConvertDoubleArrayToRows;
-import org.apache.sysml.api.MLOutput.ProjectRows;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
@@ -131,19 +129,6 @@ public class RDDConverterUtilsExt
 	{
 		return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), input, mcIn, true);
 	}
-	
-	// Useful for printing, testing binary blocked RDD and also for external use.
-	public static JavaRDD<String> binaryBlockToStringRDD(JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn, String format) throws DMLRuntimeException {
-		if(format.equals("text")) {
-			JavaRDD<String> ijv = input.flatMap(new ConvertMatrixBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock()));
-			return ijv;
-		}
-		else {
-			throw new DMLRuntimeException("The output format:" + format + " is not implemented yet.");
-		}
-	}
-
-
 
 	public static DataFrame stringDataFrameToVectorDataFrame(SQLContext sqlContext, DataFrame inputDF)
 			throws DMLRuntimeException {
@@ -207,78 +192,6 @@ public class RDDConverterUtilsExt
 		return outDF;
 	}
 
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> vectorDataFrameToBinaryBlock(SparkContext sc,
-			DataFrame inputDF, MatrixCharacteristics mcOut, boolean containsID, String vectorColumnName) throws DMLRuntimeException {
-		return vectorDataFrameToBinaryBlock(new JavaSparkContext(sc), inputDF, mcOut, containsID, vectorColumnName);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> vectorDataFrameToBinaryBlock(JavaSparkContext sc,
-			DataFrame inputDF, MatrixCharacteristics mcOut, boolean containsID, String vectorColumnName)
-			throws DMLRuntimeException {
-		
-		if(containsID) {
-			inputDF = dropColumn(inputDF.sort("__INDEX"), "__INDEX");
-		}
-		
-		DataFrame df = inputDF.select(vectorColumnName);
-			
-		//determine unknown dimensions and sparsity if required
-		if( !mcOut.dimsKnown(true) ) {
-			Accumulator<Double> aNnz = sc.accumulator(0L);
-			JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, true));
-			long rlen = tmp.count();
-			long clen = ((Vector) tmp.first().get(0)).size();
-			long nnz = UtilFunctions.toLong(aNnz.value());
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), nnz);
-		}
-		
-		JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-				.zipWithIndex(); //zip row index
-		
-		//convert csv rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-				prepinput.mapPartitionsToPair(
-					new DataFrameToBinaryBlockFunction(mcOut, true));
-		
-		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
-		
-		return out;
-	}
-	
-	/**
-	 * Adding utility to support for dropping columns for older Spark versions.
-	 * @param df
-	 * @param column
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public static DataFrame dropColumn(DataFrame df, String column) throws DMLRuntimeException {
-		ArrayList<String> columnToSelect = new ArrayList<String>();
-		String firstCol = null;
-		boolean colPresent = false;
-		for(String col : df.columns()) {
-			if(col.equals(column)) {
-				colPresent = true;
-			}
-			else if(firstCol == null) {
-				firstCol = col;
-			}
-			else {
-				columnToSelect.add(col);
-			}
-		}
-		
-		if(!colPresent) {
-			throw new DMLRuntimeException("The column \"" + column + "\" is not present in the dataframe.");
-		}
-		else if(firstCol == null) {
-			throw new DMLRuntimeException("No column other than \"" + column + "\" present in the dataframe.");
-		}
-		
-		// Round about way to do in Java (not exposed in Spark 1.3.0): df = df.drop("__INDEX");
-		return df.select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
-	}
 	
 	public static DataFrame projectColumns(DataFrame df, ArrayList<String> columns) throws DMLRuntimeException {
 		ArrayList<String> columnToSelect = new ArrayList<String>();
@@ -288,44 +201,6 @@ public class RDDConverterUtilsExt
 		return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
 	}
 	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(SparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID) throws DMLRuntimeException {
-		return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, mcOut, containsID, null);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(SparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, String [] columns) throws DMLRuntimeException {
-		ArrayList<String> columns1 = new ArrayList<String>(Arrays.asList(columns));
-		return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, mcOut, false, columns1);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(SparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, ArrayList<String> columns) throws DMLRuntimeException {
-		return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, mcOut, false, columns);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(SparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID, String [] columns) 
-			throws DMLRuntimeException {
-		ArrayList<String> columns1 = new ArrayList<String>(Arrays.asList(columns));
-		return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, mcOut, containsID, columns1);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(SparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID, ArrayList<String> columns) 
-			throws DMLRuntimeException {
-		return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, mcOut, containsID, columns);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID) throws DMLRuntimeException {
-		return dataFrameToBinaryBlock(sc, df, mcOut, containsID, null);
-	}
-	
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, ArrayList<String> columns) throws DMLRuntimeException {
-		return dataFrameToBinaryBlock(sc, df, mcOut, false, columns);
-	}
 	
 	public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen) throws DMLRuntimeException {
 		return convertPy4JArrayToMB(data, rlen, clen, false);
@@ -386,77 +261,6 @@ public class RDDConverterUtilsExt
 		return ret;
 	}
 	
-	/**
-	 * Converts DataFrame into binary blocked RDD. 
-	 * Note: mcOut will be set if you don't know the dimensions.
-	 * @param sc
-	 * @param df
-	 * @param mcOut
-	 * @param containsID
-	 * @param columns
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID, ArrayList<String> columns) 
-			throws DMLRuntimeException {
-		if(columns != null) {
-			df = projectColumns(df, columns);
-		}
-		
-		if(containsID) {
-			df = dropColumn(df.sort("__INDEX"), "__INDEX");
-		}
-			
-		//determine unknown dimensions and sparsity if required
-		if( !mcOut.dimsKnown(true) ) {
-			Accumulator<Double> aNnz = sc.accumulator(0L);
-			JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, false));
-			long rlen = tmp.count();
-			long clen = containsID ? (df.columns().length - 1) : df.columns().length;
-			long nnz = UtilFunctions.toLong(aNnz.value());
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), nnz);
-		}
-		
-		JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-				.zipWithIndex(); //zip row index
-		
-		//convert csv rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-				prepinput.mapPartitionsToPair(
-					new DataFrameToBinaryBlockFunction(mcOut, false));
-		
-		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
-		
-		return out;
-	}
-	
-	public static DataFrame binaryBlockToVectorDataFrame(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockRDD, 
-			MatrixCharacteristics mc, SQLContext sqlContext) throws DMLRuntimeException {
-		long rlen = mc.getRows(); long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock();
-		// Very expensive operation here: groupByKey (where number of keys might be too large)
-		JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new ProjectRows(rlen, clen, brlen, bclen))
-				.groupByKey().map(new ConvertDoubleArrayToRows(clen, bclen, true));
-		
-		int numColumns = (int) clen;
-		if(numColumns <= 0) {
-			throw new DMLRuntimeException("Output dimensions unknown after executing the script and hence cannot create the dataframe");
-		}
-		
-		List<StructField> fields = new ArrayList<StructField>();
-		// LongTypes throw an error: java.lang.Double incompatible with java.lang.Long
-		fields.add(DataTypes.createStructField("__INDEX", DataTypes.DoubleType, false));
-		fields.add(DataTypes.createStructField("C1", new VectorUDT(), false));
-		// fields.add(DataTypes.createStructField("C1", DataTypes.createArrayType(DataTypes.DoubleType), false));
-		
-		// This will cause infinite recursion due to bug in Spark
-		// https://issues.apache.org/jira/browse/SPARK-6999
-		// return sqlContext.createDataFrame(rowsRDD, colNames); // where ArrayList<String> colNames
-		return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
-	}
-	
 	public static class AddRowID implements Function<Tuple2<Row,Long>, Row> {
 		private static final long serialVersionUID = -3733816995375745659L;
 
@@ -492,33 +296,7 @@ public class RDDConverterUtilsExt
 		return sqlContext.createDataFrame(newRows, new StructType(newSchema));
 	}
 	
-	public static DataFrame binaryBlockToDataFrame(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockRDD, 
-			MatrixCharacteristics mc, SQLContext sqlContext) throws DMLRuntimeException {
-		long rlen = mc.getRows(); long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock();
-		
-		// Very expensive operation here: groupByKey (where number of keys might be too large)
-		JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new ProjectRows(rlen, clen, brlen, bclen))
-				.groupByKey().map(new ConvertDoubleArrayToRows(clen, bclen, false));
-		
-		int numColumns = (int) clen;
-		if(numColumns <= 0) {
-			// numColumns = rowsRDD.first().length() - 1; // Ugly, so instead prefer to throw
-			throw new DMLRuntimeException("Output dimensions unknown after executing the script and hence cannot create the dataframe");
-		}
-		
-		List<StructField> fields = new ArrayList<StructField>();
-		// LongTypes throw an error: java.lang.Double incompatible with java.lang.Long
-		fields.add(DataTypes.createStructField("__INDEX", DataTypes.DoubleType, false));
-		for(int i = 1; i <= numColumns; i++) {
-			fields.add(DataTypes.createStructField("C" + i, DataTypes.DoubleType, false));
-		}
-		
-		// This will cause infinite recursion due to bug in Spark
-		// https://issues.apache.org/jira/browse/SPARK-6999
-		// return sqlContext.createDataFrame(rowsRDD, colNames); // where ArrayList<String> colNames
-		return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
-	}
+	
 	
 	private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> 
 	{
@@ -535,131 +313,6 @@ public class RDDConverterUtilsExt
 		}
 
 	}
-
-	public static class DataFrameAnalysisFunction implements Function<Row,Row>  {
-		private static final long serialVersionUID = 5705371332119770215L;
-		private RowAnalysisFunctionHelper helper = null;
-		boolean isVectorBasedRDD;
-		public DataFrameAnalysisFunction( Accumulator<Double> aNnz, boolean isVectorBasedRDD) {
-			helper = new RowAnalysisFunctionHelper(aNnz);
-			this.isVectorBasedRDD = isVectorBasedRDD;
-		}
-
-		@Override
-		public Row call(Row arg0) throws Exception {
-			if(isVectorBasedRDD)
-				return helper.analyzeVector(arg0);
-			else
-				return helper.analyzeRow(arg0);
-		}
-		
-	}
-	
-	private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,MatrixIndexes,MatrixBlock> {
-		private static final long serialVersionUID = 1501589201971233542L;
-		
-		private RowToBinaryBlockFunctionHelper helper = null; 
-		
-		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, String delim, boolean fill, double fillValue) {
-			helper = new RowToBinaryBlockFunctionHelper(mc, delim, fill, fillValue);
-		}
-		
-		@Override
-		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<Text, Long>> arg0) throws Exception {
-			return helper.convertToBinaryBlock(arg0, RDDConverterTypes.TEXT_TO_DOUBLEARR);
-		}
-		
-	}
-	
-	public static class DataFrameToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,MatrixIndexes,MatrixBlock> {
-		private static final long serialVersionUID = 653447740362447236L;
-		private RowToBinaryBlockFunctionHelper helper = null; 
-		boolean isVectorBasedDF;
-		
-		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean isVectorBasedDF) {
-			helper = new RowToBinaryBlockFunctionHelper(mc);
-			this.isVectorBasedDF = isVectorBasedDF;
-		}
-		
-		@Override
-		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
-			if(isVectorBasedDF)
-				return helper.convertToBinaryBlock(arg0, RDDConverterTypes.VECTOR_TO_DOUBLEARR);
-			else
-				return helper.convertToBinaryBlock(arg0, RDDConverterTypes.ROW_TO_DOUBLEARR);
-		}
-		
-	}
-	
-	private static class RowAnalysisFunctionHelper implements Serializable 
-	{
-		private static final long serialVersionUID = 2310303223289674477L;
-
-		private Accumulator<Double> _aNnz = null;
-		private String _delim = null;
-		
-		public RowAnalysisFunctionHelper( Accumulator<Double> aNnz ) {
-			_aNnz = aNnz;
-		}
-		
-		public RowAnalysisFunctionHelper( Accumulator<Double> aNnz, String delim ) {
-			_aNnz = aNnz;
-			_delim = delim;
-		}
-		
-		public String analyzeText(Text v1) throws Exception {
-			//parse input line
-			String line = v1.toString();
-			String[] cols = IOUtilFunctions.split(line, _delim);
-			
-			//determine number of non-zeros of row (w/o string parsing)
-			long lnnz = 0;
-			for( String col : cols ) {
-				if( !col.isEmpty() && !col.equals("0") && !col.equals("0.0") ) {
-					lnnz++;
-				}
-			}
-			
-			//update counters
-			_aNnz.add( (double)lnnz );
-			
-			return line;
-		}
-		
-		public Row analyzeRow(Row arg0) throws Exception {
-			//determine number of non-zeros of row
-			long lnnz = 0;
-			if(arg0 != null) {
-				for(int i = 0; i < arg0.length(); i++) {
-					if(RowToBinaryBlockFunctionHelper.getDoubleValue(arg0, i) != 0) {
-						lnnz++;
-					}
-				}
-			}
-			else {
-				throw new Exception("Error while analyzing row");
-			}
-			
-			//update counters
-			_aNnz.add( (double)lnnz );
-			
-			return arg0;
-		}
-		
-		public Row analyzeVector(Row row)  {
-			Vector vec = (Vector) row.get(0); // assumption: 1 column DF
-			long lnnz = 0;
-			for(int i = 0; i < vec.size(); i++) {
-				if(vec.apply(i) != 0) { 
-					lnnz++;
-				}
-			}
-			
-			//update counters
-			_aNnz.add( (double)lnnz );
-			return row;
-		}
-	}
 	
 	private static class IJVToBinaryBlockFunctionHelper implements Serializable {
 		private static final long serialVersionUID = -7952801318564745821L;
@@ -768,178 +421,4 @@ public class RDDConverterUtilsExt
 			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
 		}
 	}
-	
-	/**
-	 * This functions allows to map rdd partitions of csv rows into a set of partial binary blocks.
-	 * 
-	 * NOTE: For this csv to binary block function, we need to hold all output blocks per partition 
-	 * in-memory. Hence, we keep state of all column blocks and aggregate row segments into these blocks. 
-	 * In terms of memory consumption this is better than creating partial blocks of row segments.
-	 * 
-	 */
-	private static class RowToBinaryBlockFunctionHelper implements Serializable 
-	{
-		private static final long serialVersionUID = -4948430402942717043L;
-		
-		private long _rlen = -1;
-		private long _clen = -1;
-		private int _brlen = -1;
-		private int _bclen = -1;
-		private String _delim = null;
-		private boolean _fill = false;
-		private double _fillValue = 0;
-		
-		public RowToBinaryBlockFunctionHelper(MatrixCharacteristics mc)
-		{
-			_rlen = mc.getRows();
-			_clen = mc.getCols();
-			_brlen = mc.getRowsPerBlock();
-			_bclen = mc.getColsPerBlock();
-		}
-		
-		public RowToBinaryBlockFunctionHelper(MatrixCharacteristics mc, String delim, boolean fill, double fillValue)
-		{
-			_rlen = mc.getRows();
-			_clen = mc.getCols();
-			_brlen = mc.getRowsPerBlock();
-			_bclen = mc.getColsPerBlock();
-			_delim = delim;
-			_fill = fill;
-			_fillValue = fillValue;
-		}
-		
-		boolean emptyFound = false;
-		
-		// ----------------------------------------------------
-		public double[] textToDoubleArray(Text row) {
-			String[] parts = IOUtilFunctions.split(row.toString(), _delim);
-			double[] ret = new double[parts.length];
-			int ix = 0;
-			for(String part : parts) {
-				emptyFound |= part.isEmpty() && !_fill;
-				double val = (part.isEmpty() && _fill) ?
-						_fillValue : Double.parseDouble(part);
-				ret[ix++] = val;
-			}
-			return ret;
-		}
-		public double[] rowToDoubleArray(Row row) throws Exception {
-			double[] ret = new double[row.length()];
-			for(int i = 0; i < row.length(); i++) {
-				ret[i] = getDoubleValue(row, i);
-			}
-			return ret;
-		}
-		
-		public double[] vectorToDoubleArray(Vector arg) throws Exception {
-			return arg.toDense().values();
-		}
-		// ----------------------------------------------------
-
-		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> convertToBinaryBlock(Object arg0, RDDConverterTypes converter) 
-			throws Exception 
-		{
-			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
-
-			int ncblks = (int)Math.ceil((double)_clen/_bclen);
-			MatrixIndexes[] ix = new MatrixIndexes[ncblks];
-			MatrixBlock[] mb = new MatrixBlock[ncblks];
-			
-			@SuppressWarnings("unchecked")
-			Iterator<Tuple2<?,Long>> iter = (Iterator<Tuple2<?, Long>>) arg0;
-			while( iter.hasNext() )
-			{
-				Tuple2<?,Long> tmp = iter.next();
-				// String row = tmp._1();
-				long rowix = tmp._2() + 1;
-				
-				long rix = UtilFunctions.computeBlockIndex(rowix, _brlen);
-				int pos = UtilFunctions.computeCellInBlock(rowix, _brlen);
-			
-				//create new blocks for entire row
-				if( ix[0] == null || ix[0].getRowIndex() != rix ) {
-					if( ix[0] !=null )
-						flushBlocksToList(ix, mb, ret);
-					long len = UtilFunctions.computeBlockSize(_rlen, rix, _brlen);
-					createBlocks(rowix, (int)len, ix, mb);
-				}
-				
-				//process row data
-				emptyFound = false;
-				double[] parts = null;
-				switch(converter) {
-					case TEXT_TO_DOUBLEARR:
-						parts = textToDoubleArray((Text) tmp._1());
-						break;
-					case ROW_TO_DOUBLEARR:
-						parts = rowToDoubleArray((Row) tmp._1());
-						break;
-					case VECTOR_TO_DOUBLEARR:
-						parts = vectorToDoubleArray((Vector) ((Row) tmp._1()).get(0));
-						break;
-					default:
-						throw new Exception("Invalid converter for row-based data:" + converter.toString());
-				}
-				
-				for( int cix=1, pix=0; cix<=ncblks; cix++ ) 
-				{
-					int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
-					for( int j=0; j<lclen; j++ ) {
-						double val = parts[pix++];
-						mb[cix-1].appendValue(pos, j, val);
-					}	
-				}
-		
-				//sanity check empty cells filled w/ values
-				if(converter == RDDConverterTypes.TEXT_TO_DOUBLEARR)
-					IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(((Text) tmp._1()).toString(), _fill, emptyFound);
-			}
-		
-			//flush last blocks
-			flushBlocksToList(ix, mb, ret);
-		
-			return ret;
-		}
-			
-		// Creates new state of empty column blocks for current global row index.
-		private void createBlocks(long rowix, int lrlen, MatrixIndexes[] ix, MatrixBlock[] mb)
-		{
-			//compute row block index and number of column blocks
-			long rix = UtilFunctions.computeBlockIndex(rowix, _brlen);
-			int ncblks = (int)Math.ceil((double)_clen/_bclen);
-			
-			//create all column blocks (assume dense since csv is dense text format)
-			for( int cix=1; cix<=ncblks; cix++ ) {
-				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
-				ix[cix-1] = new MatrixIndexes(rix, cix);
-				mb[cix-1] = new MatrixBlock(lrlen, lclen, false);		
-			}
-		}
-		
-		// Flushes current state of filled column blocks to output list.
-		private void flushBlocksToList( MatrixIndexes[] ix, MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
-			throws DMLRuntimeException
-		{
-			int len = ix.length;			
-			for( int i=0; i<len; i++ )
-				if( mb[i] != null ) {
-					ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i]));
-					mb[i].examSparsity(); //ensure right representation
-				}	
-		}
-		
-		public static double getDoubleValue(Row row, int index) throws Exception {
-			try {
-				return row.getDouble(index);
-			} catch(Exception e) {
-				try {
-					// Causes lock-contention for Java 7
-					return Double.parseDouble(row.get(index).toString());
-				}
-				catch(Exception e1) {
-					throw new Exception("Only double types are supported as input to SystemML. The input argument is \'" + row.get(index) + "\'");
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 65f3b1a..ae0fca7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -155,14 +155,16 @@ public class MatrixCharacteristics implements Serializable
 		+", blocks ("+numRowsPerBlock+" x "+numColumnsPerBlock+")]";
 	}
 	
-	public void setDimension(long nr, long nc)
-	{
+	public void setDimension(long nr, long nc) {
 		numRows = nr;
 		numColumns = nc;
 	}
 	
-	public void setBlockSize(int bnr, int bnc)
-	{
+	public void setBlockSize(int blen) {
+		setBlockSize(blen, blen);
+	}
+	
+	public void setBlockSize(int bnr, int bnc) {
 		numRowsPerBlock = bnr;
 		numColumnsPerBlock = bnc;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index c790ae9..9bb27d9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -904,6 +904,33 @@ public class DataConverter
 	/**
 	 * 
 	 * @param mb
+	 * @param dest
+	 * @param destPos
+	 */
+	public static void copyToDoubleVector( MatrixBlock mb, double[] dest, int destPos )
+	{
+		if( mb.isEmptyBlock(false) )
+			return; //quick path
+			
+		int rows = mb.getNumRows();
+		int cols = mb.getNumColumns();
+		
+		if( mb.isInSparseFormat() ) {
+			Iterator<IJV> iter = mb.getSparseBlockIterator();
+			while( iter.hasNext() ) {
+				IJV cell = iter.next();
+				dest[destPos+cell.getI()*cols+cell.getJ()] = cell.getV();
+			}
+		}
+		else {
+			//memcopy row major representation if at least 1 non-zero
+			System.arraycopy(mb.getDenseBlock(), 0, dest, destPos, rows*cols);
+		}
+	}
+	
+	/**
+	 * 
+	 * @param mb
 	 * @return
 	 */
 	public static String toString(MatrixBlock mb) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index fa17fcd..1ac552f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -601,13 +601,12 @@ public class UtilFunctions
 	}
 	
 	
-	/*
+	/**
 	 * This function will return datatype, if its Matrix or Frame
 	 * 
 	 *  @param	str
 	 *  		Instruction string to execute
 	 */
-	
 	public static DataType getDataType(String str, int index)
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
@@ -615,4 +614,29 @@ public class UtilFunctions
 	
 		return in1.getDataType();
 	}
+	
+	/**
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	public static double getDouble(Object obj) {
+		return (obj instanceof Double) ? (Double)obj :
+			Double.parseDouble(obj.toString());
+	}
+	
+	/**
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	public static boolean isNonZero(Object obj) {
+		if( obj instanceof Double ) 
+			return ((Double) obj) != 0;
+		else {
+			//avoid expensive double parsing
+			String sobj = obj.toString();
+			return (!sobj.equals("0") && !sobj.equals("0.0"));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
new file mode 100644
index 0000000..c19865c
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameConversionTest extends AutomatedTestBase 
+{
+	private final static String TEST_DIR = "functions/mlcontext/";
+	private final static String TEST_NAME = "DataFrameConversion";
+	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameConversionTest.class.getSimpleName() + "/";
+
+	private final static int  rows1 = 2245;
+	private final static int  cols1 = 745;
+	private final static int  cols2 = 1264;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;
+	private final static double eps=0.0000000001;
+
+	 
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+	}
+	
+	@Test
+	public void testVectorConversionSingleDense() {
+		testDataFrameConversion(true, true, true, false);
+	}
+	
+	@Test
+	public void testVectorConversionSingleDenseUnknown() {
+		testDataFrameConversion(true, true, true, true);
+	}
+	
+	@Test
+	public void testVectorConversionSingleSparse() {
+		testDataFrameConversion(true, true, false, false);
+	}
+	
+	@Test
+	public void testVectorConversionSingleSparseUnknown() {
+		testDataFrameConversion(true, true, false, true);
+	}
+	
+	@Test
+	public void testVectorConversionMultiDense() {
+		testDataFrameConversion(true, false, true, false);
+	}
+	
+	@Test
+	public void testVectorConversionMultiDenseUnknown() {
+		testDataFrameConversion(true, false, true, true);
+	}
+	
+	@Test
+	public void testVectorConversionMultiSparse() {
+		testDataFrameConversion(true, false, false, false);
+	}
+	
+	@Test
+	public void testVectorConversionMultiSparseUnknown() {
+		testDataFrameConversion(true, false, false, true);
+	}
+
+	@Test
+	public void testRowConversionSingleDense() {
+		testDataFrameConversion(false, true, true, false);
+	}
+	
+	@Test
+	public void testRowConversionSingleDenseUnknown() {
+		testDataFrameConversion(false, true, true, true);
+	}
+	
+	@Test
+	public void testRowConversionSingleSparse() {
+		testDataFrameConversion(false, true, false, false);
+	}
+	
+	@Test
+	public void testRowConversionSingleSparseUnknown() {
+		testDataFrameConversion(false, true, false, true);
+	}
+	
+	@Test
+	public void testRowConversionMultiDense() {
+		testDataFrameConversion(false, false, true, false);
+	}
+	
+	@Test
+	public void testRowConversionMultiDenseUnknown() {
+		testDataFrameConversion(false, false, true, true);
+	}
+	
+	@Test
+	public void testRowConversionMultiSparse() {
+		testDataFrameConversion(false, false, false, false);
+	}
+	
+	@Test
+	public void testRowConversionMultiSparseUnknown() {
+		testDataFrameConversion(false, false, false, true);
+	}
+	
+	/**
+	 * 
+	 * @param vector
+	 * @param singleColBlock
+	 * @param dense
+	 * @param unknownDims
+	 */
+	private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
+		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+		SparkExecutionContext sec = null;
+		
+		try
+		{
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+			
+			//generate input data and setup metadata
+			int cols = singleColBlock ? cols1 : cols2;
+			double sparsity = dense ? sparsity1 : sparsity2; 
+			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
+			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+			
+			//setup spark context
+			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
+			JavaSparkContext sc = sec.getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			//get binary block input rdd
+			JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+			
+			//matrix - dataframe - matrix conversion
+			DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+			JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+			
+			//get output matrix block
+			MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+			
+			//compare matrix blocks
+			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+		}
+		catch( Exception ex ) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			sec.close();
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+			DMLScript.rtplatform = oldPlatform;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
index 749f8c1..7b68065 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
@@ -50,6 +50,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.utils.TestUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
 import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
 import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
@@ -201,7 +202,7 @@ public class GNMFTest extends AutomatedTestBase
 				MatrixCharacteristics mcW = out.getMatrixCharacteristics("W");
 				CoordinateMatrix coordinateMatrix = new CoordinateMatrix(matRDD.rdd(), mcW.getRows(), mcW.getCols());
 				JavaPairRDD<MatrixIndexes, MatrixBlock> binaryRDD = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(sc, coordinateMatrix, mcW, true);
-				JavaRDD<String> wOut = RDDConverterUtilsExt.binaryBlockToStringRDD(binaryRDD, mcW, "text");
+				JavaRDD<String> wOut = RDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcW);
 				
 				String fName = output("w");
 				try {