You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/16 17:37:33 UTC

[1/2] incubator-systemml git commit: [SYSTEMML-923] Performance spark csv reblock of sparse matrices

Repository: incubator-systemml
Updated Branches:
  refs/heads/master b20727de5 -> df090f2b1


[SYSTEMML-923] Performance spark csv reblock of sparse matrices

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ed072841
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ed072841
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ed072841

Branch: refs/heads/master
Commit: ed072841c25a0c527f78d66d087c3d047c2f95d6
Parents: b20727d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 01:17:31 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 16 06:33:28 2016 +0200

----------------------------------------------------------------------
 .../instructions/spark/utils/RDDConverterUtils.java    | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ed072841/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6fe4a50..ba1934a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -50,6 +50,7 @@ import org.apache.spark.sql.types.StructField;
 import scala.Tuple2;
 
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
@@ -597,9 +598,8 @@ public class RDDConverterUtils
 			//determine number of non-zeros of row (w/o string parsing)
 			long lnnz = 0;
 			for( String col : cols ) {
-				if( !col.isEmpty() && !col.equals("0") && !col.equals("0.0") ) {
-					lnnz++;
-				}
+				lnnz += (!col.isEmpty() && !col.equals("0") 
+						&& !col.equals("0.0")) ? 1 : 0;
 			}
 			
 			//update counters
@@ -626,6 +626,8 @@ public class RDDConverterUtils
 		private long _clen = -1;
 		private int _brlen = -1;
 		private int _bclen = -1;
+		private double _sparsity = 1.0;
+		private boolean _sparse = false;
 		private boolean _header = false;
 		private String _delim = null;
 		private boolean _fill = false;
@@ -637,6 +639,9 @@ public class RDDConverterUtils
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
+			_sparsity = OptimizerUtils.getSparsity(mc);
+			_sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(mc.getRows(), 
+					mc.getCols(), mc.getNonZeros()) && (!fill || fillValue==0);
 			_header = hasHeader;
 			_delim = delim;
 			_fill = fill;
@@ -710,7 +715,7 @@ public class RDDConverterUtils
 			for( int cix=1; cix<=ncblks; cix++ ) {
 				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
 				ix[cix-1] = new MatrixIndexes(rix, cix);
-				mb[cix-1] = new MatrixBlock(lrlen, lclen, false);		
+				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse, (int)(lrlen*lclen*_sparsity));		
 			}
 		}
 		


[2/2] incubator-systemml git commit: [SYSTEMML-914] Rework dataframe-frame, frame-dataframe converters

Posted by mb...@apache.org.
[SYSTEMML-914] Rework dataframe-frame, frame-dataframe converters

Similar to the rework of dataframe-matrix converters, this patch fixes
various correctness and performance issues of the dataframe-frame
converters. This includes consistent rowID column handling, consistent
APIs, a fix of dimension analysis with existing ID column, exploitation
of existing row IDs, avoided unnecessary dimension analysis on unknown
nnzs, efficient schema handling, and a more efficient parsing of frame
inputs by exploiting matching value types. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/df090f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/df090f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/df090f2b

Branch: refs/heads/master
Commit: df090f2b1f9dd23a6fd48c5a67a95e1eb8e3ba59
Parents: ed07284
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 01:35:54 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 16 06:33:32 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLOutput.java     |  19 +-
 .../api/mlcontext/MLContextConversionUtil.java  |   5 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 218 +++++++++-------
 .../spark/utils/RDDConverterUtils.java          |   6 +-
 .../sysml/runtime/util/UtilFunctions.java       |   7 +-
 .../functions/frame/FrameConverterTest.java     |  13 +-
 .../mlcontext/DataFrameConversionTest.java      | 196 ---------------
 .../mlcontext/DataFrameFrameConversionTest.java | 246 +++++++++++++++++++
 .../DataFrameMatrixConversionTest.java          | 196 +++++++++++++++
 .../functions/mlcontext/FrameTest.java          |  12 +-
 .../mlcontext/MLContextFrameTest.java           |  35 +--
 11 files changed, 623 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index ec2f24d..d011104 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -47,18 +47,17 @@ public class MLOutput {
 	Map<String, JavaPairRDD<?,?>> _outputs;
 	private Map<String, MatrixCharacteristics> _outMetadata = null;
 	
+	public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) {
+		this._outputs = outputs;
+		this._outMetadata = outMetadata;
+	}
+	
 	public MatrixBlock getMatrixBlock(String varName) throws DMLRuntimeException {
 		MatrixCharacteristics mc = getMatrixCharacteristics(varName);
 		// The matrix block is always pushed to an RDD and then we do collect
 		// We can later avoid this by returning symbol table rather than "Map<String, JavaPairRDD<MatrixIndexes,MatrixBlock>> _outputs"
-		MatrixBlock mb = SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(), 
+		return SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(), 
 				mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros());
-		return mb;
-	}
-
-	public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) {
-		this._outputs = outputs;
-		this._outMetadata = outMetadata;
 	}
 	
 	@SuppressWarnings("unchecked")
@@ -160,6 +159,8 @@ public class MLOutput {
 	}
 	
 	public JavaRDD<String> getStringFrameRDD(String varName, String format, CSVFileFormatProperties fprop ) throws DMLRuntimeException {
+		//TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and 
+		//hence we will never find a frame binary block in the outputs.
 		JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName);
 		MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); 
 		if(format.equals("csv")) {
@@ -175,9 +176,11 @@ public class MLOutput {
 	}
 	
 	public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException {
+		//TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and 
+		//hence we will never find a frame binary block in the outputs.
 		JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName);
 		MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
-		return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryRDD, mcIn, jsc);
+		return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null);
 	}
 	
 	public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 5476902..e74dc53 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -1217,11 +1217,10 @@ public class MLContextConversionUtil {
 			@SuppressWarnings("unchecked")
 			JavaPairRDD<Long, FrameBlock> binaryBlockFrame = (JavaPairRDD<Long, FrameBlock>) sparkExecutionContext
 					.getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo);
-			MatrixCharacteristics matrixCharacteristics = frameObject.getMatrixCharacteristics();
+			MatrixCharacteristics mc = frameObject.getMatrixCharacteristics();
 
 			JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
-
-			return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, matrixCharacteristics, jsc);
+			return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryBlockFrame, mc, frameObject.getSchema());
 		} 
 		catch (DMLRuntimeException e) {
 			throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 211f814..faf8ba1 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -54,6 +55,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
 import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -326,27 +328,29 @@ public class FrameRDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
-			DataFrame df, MatrixCharacteristics mcOut, boolean containsID) 
+			DataFrame df, MatrixCharacteristics mc, boolean containsID) 
 		throws DMLRuntimeException 
 	{
-		
-		if(containsID)
-			df = df.drop(RDDConverterUtils.DF_ID_COLUMN);
-		
 		//determine unknown dimensions if required
-		if( !mcOut.dimsKnown(true) ) {
+		if( !mc.dimsKnown() ) { //nnz are irrelevant here
 			JavaRDD<Row> tmp = df.javaRDD();
 			long rlen = tmp.count();
-			long clen = containsID ? (df.columns().length - 1) : df.columns().length;
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+			long clen = df.columns().length - (containsID?1:0);
+			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
 		}
 		
-		JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-				.zipWithIndex(); //zip row index
-		
+		JavaPairRDD<Row, Long> prepinput = containsID ?
+				df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
+				df.javaRDD().zipWithIndex(); //zip row index
+
+		//convert data frame to frame schema (prepare once)
+		List<String> colnames = new ArrayList<String>();
+		List<ValueType> fschema = new ArrayList<ValueType>();
+		convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
+				
 		//convert rdd to binary block rdd
-		JavaPairRDD<Long, FrameBlock> out = prepinput
-				.mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut));
+		JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair(
+				new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID));
 		
 		return out;
 	}
@@ -359,22 +363,27 @@ public class FrameRDDConverterUtils
 	 * @param strict
 	 * @return
 	 */
-	public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc)
+	public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in, 
+			MatrixCharacteristics mc, List<ValueType> schema)
 	{
-		List<ValueType> schema = in.first()._2().getSchema();
+		if( !mc.colsKnown() )
+			throw new RuntimeException("Number of columns needed to convert binary block to data frame.");
 		
-		//convert binary block to rows rdd (from blocks/rows)
-		JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction());
+		//convert binary block to rows rdd 
+		JavaRDD<Row> rowRDD = in.flatMap(
+				new BinaryBlockToDataFrameFunction());
 				
-		SQLContext sqlContext = new SQLContext(sc);
-		StructType dfSchema = convertFrameSchemaToDFSchema(schema);
-		DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+		//create data frame schema
+		if( schema == null )
+			schema = Collections.nCopies((int)mc.getCols(), ValueType.STRING);
+		StructType dfSchema = convertFrameSchemaToDFSchema(schema, true);
 	
-		return df;
+		//rdd to data frame conversion
+		return sqlctx.createDataFrame(rowRDD, dfSchema);
 	}
 	
 	
-	/*
+	/**
 	 * This function will convert Frame schema into DataFrame schema 
 	 * 
 	 *  @param	schema
@@ -382,32 +391,64 @@ public class FrameRDDConverterUtils
 	 *  @return
 	 *  		Returns the DataFrame schema (StructType)
 	 */
-	public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
+	public static StructType convertFrameSchemaToDFSchema(List<ValueType> fschema, boolean containsID)
 	{
-		// Generate the schema based on the string of schema
+		// generate the schema based on the string of schema
 		List<StructField> fields = new ArrayList<StructField>();
 		
-		int i = 1;
-		for (ValueType schema : lschema) {
-			org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
+		// add id column type
+		if( containsID )
+			fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, 
+					DataTypes.DoubleType, true));
+		
+		// add remaining types
+		int col = 1;
+		for (ValueType schema : fschema) {
+			DataType dt = null;
 			switch(schema) {
-				case STRING:  dataType = DataTypes.StringType; break;
-				case DOUBLE:  dataType = DataTypes.DoubleType; break;
-				case INT:     dataType = DataTypes.LongType; break;
-				case BOOLEAN: dataType = DataTypes.BooleanType; break;
-				default:
+				case STRING:  dt = DataTypes.StringType; break;
+				case DOUBLE:  dt = DataTypes.DoubleType; break;
+				case INT:     dt = DataTypes.LongType; break;
+				case BOOLEAN: dt = DataTypes.BooleanType; break;
+				default:      dt = DataTypes.StringType;
 					LOG.warn("Using default type String for " + schema.toString());
 			}
-			fields.add(DataTypes.createStructField("C"+i++, dataType, true));
+			fields.add(DataTypes.createStructField("C"+col++, dt, true));
 		}
 		
 		return DataTypes.createStructType(fields);
 	}
 	
+	/**
+	 * 
+	 * @param dfschema
+	 * @param containsID
+	 * @return
+	 */
+	public static void convertDFSchemaToFrameSchema(StructType dfschema, List<String> colnames, 
+			List<ValueType> fschema, boolean containsID)
+	{
+		int off = containsID ? 1 : 0;
+		for( int i=off; i<dfschema.fields().length; i++ ) {
+			StructField structType = dfschema.apply(i);
+			colnames.add(structType.name());
+			if(structType.dataType() == DataTypes.DoubleType 
+				|| structType.dataType() == DataTypes.FloatType)
+				fschema.add(ValueType.DOUBLE);
+			else if(structType.dataType() == DataTypes.LongType 
+				|| structType.dataType() == DataTypes.IntegerType)
+				fschema.add(ValueType.INT);
+			else if(structType.dataType() == DataTypes.BooleanType)
+				fschema.add(ValueType.BOOLEAN);
+			else
+				fschema.add(ValueType.STRING);
+		}
+	}
+	
 	/* 
 	 * It will return JavaRDD<Row> based on csv data input file.
 	 */
-	public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema)
+	public static JavaRDD<Row> csvToRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema)
 	{
 		// Load a text file and convert each line to a java rdd.
 		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
@@ -695,20 +736,29 @@ public class FrameRDDConverterUtils
 		private static final long serialVersionUID = 2269315691094111843L;
 
 		private long _clen = -1;
+		private List<String> _colnames = null;
+		private List<ValueType> _schema = null;
+		private boolean _containsID = false;
 		private int _maxRowsPerBlock = -1;
 		
-		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) {
+		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, List<String> colnames, 
+				List<ValueType> schema, boolean containsID) {
 			_clen = mc.getCols();
+			_colnames = colnames;
+			_schema = schema;
+			_containsID = containsID;
 			_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
 		}
 		
 		@Override
-		public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+		public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) 
+			throws Exception 
+		{
 			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
 
-			Long[] ix = new Long[1];
-			FrameBlock[] mb = new FrameBlock[1];
-			int iRowsInBlock = 0;
+			long ix = -1;
+			FrameBlock fb = null;
+			Object[] tmprow = new Object[(int)_clen];
 			
 			while( arg0.hasNext() )
 			{
@@ -716,55 +766,40 @@ public class FrameRDDConverterUtils
 				Row row = tmp._1();
 				long rowix = tmp._2()+1;
 				
-				if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
-					if( iRowsInBlock == _maxRowsPerBlock )
-						flushBlocksToList(ix, mb, ret);
-					createBlocks(rowix, ix, mb, row);
-					iRowsInBlock = 0;
+				if( fb == null || fb.getNumRows() == _maxRowsPerBlock) {
+					if( fb != null )
+						flushBlocksToList(ix, fb, ret);
+					ix = rowix;
+					fb = new FrameBlock(_schema, _colnames);
 				}
 				
 				//process row data
-				Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema());
-				mb[0].appendRow(parts);
-				iRowsInBlock++;
+				int off = _containsID ? 1 : 0;
+				for(int i=off; i<row.size(); i++) {
+					tmprow[i-off] = UtilFunctions.objectToObject(
+							_schema.get(i-off), row.get(i));
+				}
+				fb.appendRow(tmprow);
 			}
 		
 			//flush last blocks
-			flushBlocksToList(ix, mb, ret);
+			flushBlocksToList(ix, fb, ret);
 		
 			return ret;
 		}
 		
-		public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception {
-			Object[] ret = new Object[_clen];
-			for(int i = 0; i < row.length(); i++)
-				ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i));
-			for(int i=row.length(); i<_clen; i++)
-				ret[i] = "";
-			return ret;
-		}
-
-		// Creates new state of empty column blocks for current global row index.
-		private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row)
-		{
-			//compute row block index and number of column blocks
-			ix[0] = new Long(rowix);
-			
-			List<String> columns = new ArrayList<String>();
-			List<ValueType> schema = new ArrayList<ValueType>();
-			for (StructField structType: row.schema().fields()) {
-				columns.add(structType.name());
-				if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
-					schema.add(ValueType.DOUBLE);
-				else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
-					schema.add(ValueType.INT);
-				else if(structType.dataType() == DataTypes.BooleanType)
-					schema.add(ValueType.BOOLEAN);
-				else
-					schema.add(ValueType.STRING);
-			}
-			mb[0] = new FrameBlock(schema);
-			mb[0].setColumnNames(columns);
+		/**
+		 * 
+		 * @param ix
+		 * @param fb
+		 * @param ret
+		 * @throws DMLRuntimeException
+		 */
+		private static void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+			throws DMLRuntimeException
+		{			
+			if( fb != null && fb.getNumRows()>0 )
+				ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
 		}
 	}
 
@@ -779,14 +814,21 @@ public class FrameRDDConverterUtils
 		public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
 			throws Exception 
 		{
+			long rowIndex = arg0._1();
 			FrameBlock blk = arg0._2();
 			ArrayList<Row> ret = new ArrayList<Row>();
 
 			//handle Frame block data
-			Iterator<Object[]> iter = blk.getObjectRowIterator();
-			while( iter.hasNext() )
-				ret.add(RowFactory.create(iter.next().clone()));
-				
+			int rows = blk.getNumRows();
+			int cols = blk.getNumColumns();
+			for( int i=0; i<rows; i++ ) {
+				Object[] row = new Object[cols+1];
+				row[0] = rowIndex++;
+				for( int j=0; j<cols; j++ )
+					row[j+1] = blk.get(i, j);
+				ret.add(RowFactory.create(row));
+			}
+			
 			return ret;
 		}
 	}
@@ -1046,13 +1088,11 @@ public class FrameRDDConverterUtils
 	// Common functions
 	
 	// Flushes current state of filled column blocks to output list.
-	private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+	private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
 		throws DMLRuntimeException
-	{
-		int len = ix.length;			
-		for( int i=0; i<len; i++ )
-			if( mb[i] != null ) {
-				ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i]));
-			}	
+	{			
+		for( int i=0; i<ix.length; i++ )
+			if( fb[i] != null && fb[0].getNumRows()>0 )
+				ret.add(new Tuple2<Long,FrameBlock>(ix[i], fb[i]));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index ba1934a..3ee1ef8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -322,7 +322,7 @@ public class RDDConverterUtils
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
-	public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext, 
+	public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, 
 			JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)  
 	{
 		if( !mc.colsKnown() )
@@ -344,7 +344,7 @@ public class RDDConverterUtils
 		}
 		
 		//rdd to data frame conversion
-		return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
+		return sqlctx.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
 	}
 	
 	/**
@@ -1011,7 +1011,7 @@ public class RDDConverterUtils
 	/**
 	 * 
 	 */
-	private static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long> 
+	protected static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long> 
 	{
 		private static final long serialVersionUID = 7438855241666363433L;
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 1ac552f..81a1e8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -423,8 +423,11 @@ public class UtilFunctions
 	 * @return
 	 */
 	public static Object objectToObject(ValueType vt, Object in ) {
-		String str = objectToString(in);
-		return stringToObject(vt, str );
+		if( in instanceof Double && vt == ValueType.DOUBLE 
+			|| in instanceof Long && vt == ValueType.INT )
+			return in; //quick path to avoid double parsing
+		else
+			return stringToObject(vt, objectToString(in) );
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 107dee3..f0c17eb 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase
 
 				//Create DataFrame 
 				SQLContext sqlContext = new SQLContext(sc);
-				StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema);
-				JavaRDD<Row> rowRDD = FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema);
+				StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
+				JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, schema);
 				DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
 				
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
@@ -532,13 +532,14 @@ public class FrameConverterTest extends AutomatedTestBase
 			case BIN2DFRM: {
 				InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
 				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
-				JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
-				JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
-				DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+				JavaPairRDD<Long, FrameBlock> rddIn = sc
+						.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
+				 		.mapToPair(new LongWritableFrameToLongFrameFunction());
+				DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, schema);
 				
 				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+						.dataFrameToBinaryBlock(sc, df, mc, true)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
deleted file mode 100644
index c19865c..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.mlcontext;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-
-public class DataFrameConversionTest extends AutomatedTestBase 
-{
-	private final static String TEST_DIR = "functions/mlcontext/";
-	private final static String TEST_NAME = "DataFrameConversion";
-	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameConversionTest.class.getSimpleName() + "/";
-
-	private final static int  rows1 = 2245;
-	private final static int  cols1 = 745;
-	private final static int  cols2 = 1264;
-	private final static double sparsity1 = 0.9;
-	private final static double sparsity2 = 0.1;
-	private final static double eps=0.0000000001;
-
-	 
-	@Override
-	public void setUp() {
-		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
-	}
-	
-	@Test
-	public void testVectorConversionSingleDense() {
-		testDataFrameConversion(true, true, true, false);
-	}
-	
-	@Test
-	public void testVectorConversionSingleDenseUnknown() {
-		testDataFrameConversion(true, true, true, true);
-	}
-	
-	@Test
-	public void testVectorConversionSingleSparse() {
-		testDataFrameConversion(true, true, false, false);
-	}
-	
-	@Test
-	public void testVectorConversionSingleSparseUnknown() {
-		testDataFrameConversion(true, true, false, true);
-	}
-	
-	@Test
-	public void testVectorConversionMultiDense() {
-		testDataFrameConversion(true, false, true, false);
-	}
-	
-	@Test
-	public void testVectorConversionMultiDenseUnknown() {
-		testDataFrameConversion(true, false, true, true);
-	}
-	
-	@Test
-	public void testVectorConversionMultiSparse() {
-		testDataFrameConversion(true, false, false, false);
-	}
-	
-	@Test
-	public void testVectorConversionMultiSparseUnknown() {
-		testDataFrameConversion(true, false, false, true);
-	}
-
-	@Test
-	public void testRowConversionSingleDense() {
-		testDataFrameConversion(false, true, true, false);
-	}
-	
-	@Test
-	public void testRowConversionSingleDenseUnknown() {
-		testDataFrameConversion(false, true, true, true);
-	}
-	
-	@Test
-	public void testRowConversionSingleSparse() {
-		testDataFrameConversion(false, true, false, false);
-	}
-	
-	@Test
-	public void testRowConversionSingleSparseUnknown() {
-		testDataFrameConversion(false, true, false, true);
-	}
-	
-	@Test
-	public void testRowConversionMultiDense() {
-		testDataFrameConversion(false, false, true, false);
-	}
-	
-	@Test
-	public void testRowConversionMultiDenseUnknown() {
-		testDataFrameConversion(false, false, true, true);
-	}
-	
-	@Test
-	public void testRowConversionMultiSparse() {
-		testDataFrameConversion(false, false, false, false);
-	}
-	
-	@Test
-	public void testRowConversionMultiSparseUnknown() {
-		testDataFrameConversion(false, false, false, true);
-	}
-	
-	/**
-	 * 
-	 * @param vector
-	 * @param singleColBlock
-	 * @param dense
-	 * @param unknownDims
-	 */
-	private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
-		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
-		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
-
-		SparkExecutionContext sec = null;
-		
-		try
-		{
-			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-			
-			//generate input data and setup metadata
-			int cols = singleColBlock ? cols1 : cols2;
-			double sparsity = dense ? sparsity1 : sparsity2; 
-			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
-			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
-			int blksz = ConfigurationManager.getBlocksize();
-			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
-			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
-			
-			//setup spark context
-			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
-			JavaSparkContext sc = sec.getSparkContext();
-			SQLContext sqlctx = new SQLContext(sc);
-			
-			//get binary block input rdd
-			JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
-			
-			//matrix - dataframe - matrix conversion
-			DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
-			JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
-			
-			//get output matrix block
-			MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
-			
-			//compare matrix blocks
-			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
-			TestUtils.compareMatrices(A, B, rows1, cols, eps);
-		}
-		catch( Exception ex ) {
-			throw new RuntimeException(ex);
-		}
-		finally {
-			sec.close();
-			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
-			DMLScript.rtplatform = oldPlatform;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
new file mode 100644
index 0000000..a26cfe8
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameFrameConversionTest extends AutomatedTestBase 
+{
+	private final static String TEST_DIR = "functions/mlcontext/";
+	private final static String TEST_NAME = "DataFrameConversion";
+	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameFrameConversionTest.class.getSimpleName() + "/";
+
+	private final static int  rows1 = 2245;
+	private final static int  cols1 = 745;
+	private final static int  cols2 = 1264;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;
+	private final static double eps=0.0000000001;
+
+	 
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+	}
+
+	
+
+	@Test
+	public void testRowDoubleConversionSingleDense() {
+		testDataFrameConversion(ValueType.DOUBLE, true, true, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, true, true, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleSparse() {
+		testDataFrameConversion(ValueType.DOUBLE, true, false, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, true, false, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiDense() {
+		testDataFrameConversion(ValueType.DOUBLE, false, true, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, false, true, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiSparse() {
+		testDataFrameConversion(ValueType.DOUBLE, false, false, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, false, false, true);
+	}
+
+	@Test
+	public void testRowStringConversionSingleDense() {
+		testDataFrameConversion(ValueType.STRING, true, true, false);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.STRING, true, true, true);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleSparse() {
+		testDataFrameConversion(ValueType.STRING, true, false, false);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.STRING, true, false, true);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiDense() {
+		testDataFrameConversion(ValueType.STRING, false, true, false);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.STRING, false, true, true);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiSparse() {
+		testDataFrameConversion(ValueType.STRING, false, false, false);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.STRING, false, false, true);
+	}
+
+	@Test
+	public void testRowLongConversionSingleDense() {
+		testDataFrameConversion(ValueType.INT, true, true, false);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.INT, true, true, true);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleSparse() {
+		testDataFrameConversion(ValueType.INT, true, false, false);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.INT, true, false, true);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiDense() {
+		testDataFrameConversion(ValueType.INT, false, true, false);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.INT, false, true, true);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiSparse() {
+		testDataFrameConversion(ValueType.INT, false, false, false);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.INT, false, false, true);
+	}
+	
+	/**
+	 * 
+	 * @param vector
+	 * @param singleColBlock
+	 * @param dense
+	 * @param unknownDims
+	 */
+	private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
+		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+		SparkExecutionContext sec = null;
+		
+		try
+		{
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+			
+			//generate input data and setup metadata
+			int cols = singleColBlock ? cols1 : cols2;
+			double sparsity = dense ? sparsity1 : sparsity2; 
+			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
+			A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
+			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
+			FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+			List<ValueType> schema = Collections.nCopies(cols, vt);
+			
+			//setup spark context
+			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
+			JavaSparkContext sc = sec.getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			//get binary block input rdd
+			JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
+			
+			//frame - dataframe - frame conversion
+			DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+			JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
+			
+			//get output frame block
+			FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
+			
+			//compare frame blocks
+			MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB); 
+			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+		}
+		catch( Exception ex ) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			sec.close();
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+			DMLScript.rtplatform = oldPlatform;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
new file mode 100644
index 0000000..e88a867
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameMatrixConversionTest extends AutomatedTestBase 
+{
+	private final static String TEST_DIR = "functions/mlcontext/";
+	private final static String TEST_NAME = "DataFrameConversion";
+	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/";
+
+	private final static int  rows1 = 2245;
+	private final static int  cols1 = 745;
+	private final static int  cols2 = 1264;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;
+	private final static double eps=0.0000000001;
+
+	 
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+	}
+	
+	@Test
+	public void testVectorConversionSingleDense() {
+		testDataFrameConversion(true, true, true, false);
+	}
+	
+	@Test
+	public void testVectorConversionSingleDenseUnknown() {
+		testDataFrameConversion(true, true, true, true);
+	}
+	
+	@Test
+	public void testVectorConversionSingleSparse() {
+		testDataFrameConversion(true, true, false, false);
+	}
+	
+	@Test
+	public void testVectorConversionSingleSparseUnknown() {
+		testDataFrameConversion(true, true, false, true);
+	}
+	
+	@Test
+	public void testVectorConversionMultiDense() {
+		testDataFrameConversion(true, false, true, false);
+	}
+	
+	@Test
+	public void testVectorConversionMultiDenseUnknown() {
+		testDataFrameConversion(true, false, true, true);
+	}
+	
+	@Test
+	public void testVectorConversionMultiSparse() {
+		testDataFrameConversion(true, false, false, false);
+	}
+	
+	@Test
+	public void testVectorConversionMultiSparseUnknown() {
+		testDataFrameConversion(true, false, false, true);
+	}
+
+	@Test
+	public void testRowConversionSingleDense() {
+		testDataFrameConversion(false, true, true, false);
+	}
+	
+	@Test
+	public void testRowConversionSingleDenseUnknown() {
+		testDataFrameConversion(false, true, true, true);
+	}
+	
+	@Test
+	public void testRowConversionSingleSparse() {
+		testDataFrameConversion(false, true, false, false);
+	}
+	
+	@Test
+	public void testRowConversionSingleSparseUnknown() {
+		testDataFrameConversion(false, true, false, true);
+	}
+	
+	@Test
+	public void testRowConversionMultiDense() {
+		testDataFrameConversion(false, false, true, false);
+	}
+	
+	@Test
+	public void testRowConversionMultiDenseUnknown() {
+		testDataFrameConversion(false, false, true, true);
+	}
+	
+	@Test
+	public void testRowConversionMultiSparse() {
+		testDataFrameConversion(false, false, false, false);
+	}
+	
+	@Test
+	public void testRowConversionMultiSparseUnknown() {
+		testDataFrameConversion(false, false, false, true);
+	}
+	
+	/**
+	 * 
+	 * @param vector
+	 * @param singleColBlock
+	 * @param dense
+	 * @param unknownDims
+	 */
+	private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
+		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+		SparkExecutionContext sec = null;
+		
+		try
+		{
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+			
+			//generate input data and setup metadata
+			int cols = singleColBlock ? cols1 : cols2;
+			double sparsity = dense ? sparsity1 : sparsity2; 
+			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
+			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+			
+			//setup spark context
+			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
+			JavaSparkContext sc = sec.getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			//get binary block input rdd
+			JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+			
+			//matrix - dataframe - matrix conversion
+			DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+			JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+			
+			//get output matrix block
+			MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+			
+			//compare matrix blocks
+			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+		}
+		catch( Exception ex ) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			sec.close();
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+			DMLScript.rtplatform = oldPlatform;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index d12f6f2..11f3f02 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -226,13 +226,13 @@ public class FrameTest extends AutomatedTestBase
 		{
 			//Create DataFrame for input A 
 			SQLContext sqlContext = new SQLContext(sc);
-			StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema);
-			JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
+			StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
+			JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
 			dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
 			
 			//Create DataFrame for input B 
-			StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
-			JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
+			StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
+			JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
 			dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
 		}
 
@@ -285,7 +285,7 @@ public class FrameTest extends AutomatedTestBase
 				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
 				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, -1, -1, -1);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.dataFrameToBinaryBlock(jsc, df, mc, false)
+						.dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(output("AB"), LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
 			}
@@ -306,7 +306,7 @@ public class FrameTest extends AutomatedTestBase
 				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
 				MatrixCharacteristics mc = new MatrixCharacteristics(cRows, cCols, -1, -1, -1);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.dataFrameToBinaryBlock(jsc, df, mc, false)
+						.dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fName, LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
index 98c8b10..deac382 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
@@ -46,6 +46,7 @@ import org.apache.sysml.api.mlcontext.MatrixMetadata;
 import org.apache.sysml.api.mlcontext.Script;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToRow;
 import org.junit.After;
@@ -230,9 +231,9 @@ public class MLContextFrameTest extends AutomatedTestBase {
 
 				// Create DataFrame
 				SQLContext sqlContext = new SQLContext(sc);
-				StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA);
+				StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA, false);
 				DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, dfSchemaA);
-				StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
+				StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
 				DataFrame dataFrameB = sqlContext.createDataFrame(javaRddRowB, dfSchemaB);
 				if (script_type == SCRIPT_TYPE.DML)
 					script = dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, fmB).out("A")
@@ -368,31 +369,31 @@ public class MLContextFrameTest extends AutomatedTestBase {
 
 		} else if (outputType == IO_TYPE.DATAFRAME) {
 
-			DataFrame dataFrameA = mlResults.getDataFrame("A");
+			DataFrame dataFrameA = mlResults.getDataFrame("A").drop(RDDConverterUtils.DF_ID_COLUMN);
 			List<Row> listAOut = dataFrameA.collectAsList();
 
 			Row row1 = listAOut.get(0);
-			Assert.assertEquals("Mistmatch with expected value", "1", row1.getString(0));
-			Assert.assertEquals("Mistmatch with expected value", "Str2", row1.getString(1));
-			Assert.assertEquals("Mistmatch with expected value", "3.0", row1.getString(2));
-			Assert.assertEquals("Mistmatch with expected value", "true", row1.getString(3));
-
+			Assert.assertEquals("Mistmatch with expected value", "1", row1.get(0).toString());
+			Assert.assertEquals("Mistmatch with expected value", "Str2", row1.get(1).toString());
+			Assert.assertEquals("Mistmatch with expected value", "3.0", row1.get(2).toString());
+			Assert.assertEquals("Mistmatch with expected value", "true", row1.get(3).toString());
+			
 			Row row2 = listAOut.get(1);
-			Assert.assertEquals("Mistmatch with expected value", "4", row2.getString(0));
-			Assert.assertEquals("Mistmatch with expected value", "Str12", row2.getString(1));
-			Assert.assertEquals("Mistmatch with expected value", "13.0", row2.getString(2));
-			Assert.assertEquals("Mistmatch with expected value", "true", row2.getString(3));
+			Assert.assertEquals("Mistmatch with expected value", "4", row2.get(0).toString());
+			Assert.assertEquals("Mistmatch with expected value", "Str12", row2.get(1).toString());
+			Assert.assertEquals("Mistmatch with expected value", "13.0", row2.get(2).toString());
+			Assert.assertEquals("Mistmatch with expected value", "true", row2.get(3).toString());
 
-			DataFrame dataFrameC = mlResults.getDataFrame("C");
+			DataFrame dataFrameC = mlResults.getDataFrame("C").drop(RDDConverterUtils.DF_ID_COLUMN);
 			List<Row> listCOut = dataFrameC.collectAsList();
 
 			Row row3 = listCOut.get(0);
-			Assert.assertEquals("Mistmatch with expected value", "Str12", row3.getString(0));
-			Assert.assertEquals("Mistmatch with expected value", "13.0", row3.getString(1));
+			Assert.assertEquals("Mistmatch with expected value", "Str12", row3.get(0).toString());
+			Assert.assertEquals("Mistmatch with expected value", "13.0", row3.get(1).toString());
 
 			Row row4 = listCOut.get(1);
-			Assert.assertEquals("Mistmatch with expected value", "Str25", row4.getString(0));
-			Assert.assertEquals("Mistmatch with expected value", "26.0", row4.getString(1));
+			Assert.assertEquals("Mistmatch with expected value", "Str25", row4.get(0));
+			Assert.assertEquals("Mistmatch with expected value", "26.0", row4.get(1));
 		} else {
 			String[][] frameA = mlResults.getFrameAs2DStringArray("A");
 			Assert.assertEquals("Str2", frameA[0][1]);