You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/07/23 05:35:05 UTC

incubator-systemml git commit: [SYSTEMML-560] Frame DataFrame converters

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 821c5f50d -> 14e9f6443


[SYSTEMML-560] Frame DataFrame converters


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/14e9f644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/14e9f644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/14e9f644

Branch: refs/heads/master
Commit: 14e9f64439ce58acd3b3cdaf53e75a768a4757f0
Parents: 821c5f5
Author: Arvind Surve <ac...@yahoo.com>
Authored: Fri Jul 22 22:34:27 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Fri Jul 22 22:34:27 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 214 +++++++++++++++++--
 .../apache/sysml/runtime/io/FrameReader.java    |   2 +-
 .../runtime/matrix/data/LibMatrixBincell.java   |   2 +-
 .../runtime/matrix/data/LibMatrixOuterAgg.java  |  26 +--
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   4 +-
 .../matrix/sort/SamplingSortMRInputFormat.java  |   4 +-
 .../sysml/runtime/util/DataConverter.java       |   4 +-
 .../sysml/runtime/util/UtilFunctions.java       |  44 ++++
 .../functions/frame/FrameAppendDistTest.java    |   2 +-
 .../functions/frame/FrameConverterTest.java     |  89 +++++++-
 .../functions/frame/FrameCopyTest.java          |   2 +-
 .../functions/frame/FrameIndexingDistTest.java  |   2 +-
 .../functions/frame/FrameReadWriteTest.java     |   2 +-
 13 files changed, 349 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 052d3f3..c243eeb 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -36,6 +36,13 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 import scala.Tuple2;
 
@@ -101,8 +108,7 @@ public class FrameRDDConverterUtils
 		
 		//convert csv rdd to binary block rdd (w/ partial blocks)
 		JavaPairRDD<Long, FrameBlock> out = prepinput
-				.mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill))
-				.mapToPair(new LongWritableFrameToLongFrameFunction());
+				.mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill));
 		
 		return out;
 	}
@@ -300,8 +306,70 @@ public class FrameRDDConverterUtils
 		return out;
 	}
 	
+	//=====================================
+	// DataFrame <--> Binary block
 
+	/**
+	 * 
+	 * @param sc
+	 * @param input
+	 * @param mcOut
+	 * @param hasHeader
+	 * @param delim
+	 * @param fill
+	 * @param missingValue
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
+			DataFrame df, MatrixCharacteristics mcOut, boolean containsID) 
+		throws DMLRuntimeException 
+	{
+		
+		if(containsID)
+			df = df.drop("ID");
+		
+		//determine unknown dimensions if required
+		if( !mcOut.dimsKnown(true) ) {
+			JavaRDD<Row> tmp = df.javaRDD();
+			long rlen = tmp.count();
+			long clen = containsID ? (df.columns().length - 1) : df.columns().length;
+			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+		}
+		
+		JavaPairRDD<Row, Long> prepinput = df.javaRDD()
+				.zipWithIndex(); //zip row index
+		
+		//convert rdd to binary block rdd
+		JavaPairRDD<Long, FrameBlock> out = prepinput
+				.mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut));
+		
+		return out;
+	}
+
+	/**
+	 * 
+	 * @param in
+	 * @param mcIn
+	 * @param props
+	 * @param strict
+	 * @return
+	 */
+	public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc)
+	{
+		List<ValueType> schema = in.first()._2().getSchema();
+		
+		//convert binary block to rows rdd (from blocks/rows)
+		JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction());
+				
+		SQLContext sqlContext = new SQLContext(sc);
+		StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
+		DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+	
+		return df;
+	}
 	
+
 	/////////////////////////////////
 	// CSV-SPECIFIC FUNCTIONS
 	
@@ -391,7 +459,7 @@ public class FrameRDDConverterUtils
 	 * In terms of memory consumption this is better than creating partial blocks of row segments.
 	 * 
 	 */
-	private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,LongWritable,FrameBlock> 
+	private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,Long,FrameBlock> 
 	{
 		private static final long serialVersionUID = -1976803898174960086L;
 
@@ -413,12 +481,12 @@ public class FrameRDDConverterUtils
 		}
 
 		@Override
-		public Iterable<Tuple2<LongWritable, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) 
+		public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) 
 			throws Exception 
 		{
-			ArrayList<Tuple2<LongWritable,FrameBlock>> ret = new ArrayList<Tuple2<LongWritable,FrameBlock>>();
+			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
 
-			LongWritable[] ix = new LongWritable[1];
+			Long[] ix = new Long[1];
 			FrameBlock[] mb = new FrameBlock[1];
 			int iRowsInBlock = 0;
 			
@@ -467,10 +535,10 @@ public class FrameRDDConverterUtils
 		}
 		
 		// Creates new state of empty column blocks for current global row index.
-		private void createBlocks(long rowix, LongWritable[] ix, FrameBlock[] mb)
+		private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb)
 		{
 			//compute row block index and number of column blocks
-			ix[0] = new LongWritable(rowix);
+			ix[0] = rowix;
 			mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
 			if( _colnames != null )
 				mb[0].setColumnNames(_colnames);
@@ -481,17 +549,6 @@ public class FrameRDDConverterUtils
 				for( int j=0; j<_clen; j++ )
 					mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
 		}
-		
-		// Flushes current state of filled column blocks to output list.
-		private void flushBlocksToList( LongWritable[] ix, FrameBlock[] mb, ArrayList<Tuple2<LongWritable,FrameBlock>> ret ) 
-			throws DMLRuntimeException
-		{
-			int len = ix.length;			
-			for( int i=0; i<len; i++ )
-				if( mb[i] != null ) {
-					ret.add(new Tuple2<LongWritable,FrameBlock>(ix[i],mb[i]));
-				}	
-		}
 	}
 	
 	/**
@@ -558,6 +615,111 @@ public class FrameRDDConverterUtils
 			return ret;
 		}
 	}
+	
+	/////////////////////////////////
+	// DataFrame-SPECIFIC FUNCTIONS
+	
+	private static class DataFrameToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,Long,FrameBlock> 
+	{
+		private static final long serialVersionUID = 2269315691094111843L;
+
+		private long _clen = -1;
+		private int _maxRowsPerBlock = -1;
+		
+		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) {
+			_clen = mc.getCols();
+			_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
+		}
+		
+		@Override
+		public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
+
+			Long[] ix = new Long[1];
+			FrameBlock[] mb = new FrameBlock[1];
+			int iRowsInBlock = 0;
+			
+			while( arg0.hasNext() )
+			{
+				Tuple2<Row,Long> tmp = arg0.next();
+				Row row = tmp._1();
+				long rowix = tmp._2()+1;
+				
+				if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
+					if( iRowsInBlock == _maxRowsPerBlock )
+						flushBlocksToList(ix, mb, ret);
+					createBlocks(rowix, ix, mb, row);
+					iRowsInBlock = 0;
+				}
+				
+				//process row data
+				Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema());
+				mb[0].appendRow(parts);
+				iRowsInBlock++;
+			}
+		
+			//flush last blocks
+			flushBlocksToList(ix, mb, ret);
+		
+			return ret;
+		}
+		
+		public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception {
+			Object[] ret = new Object[_clen];
+			for(int i = 0; i < row.length(); i++)
+				ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i));
+			for(int i=row.length(); i<_clen; i++)
+				ret[i] = "";
+			return ret;
+		}
+
+		// Creates new state of empty column blocks for current global row index.
+		private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row)
+		{
+			//compute row block index and number of column blocks
+			ix[0] = new Long(rowix);
+			
+			List<String> columns = new ArrayList<String>();
+			List<ValueType> schema = new ArrayList<ValueType>();
+			for (StructField structType: row.schema().fields()) {
+				columns.add(structType.name());
+				if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
+					schema.add(ValueType.DOUBLE);
+				else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
+					schema.add(ValueType.INT);
+				else if(structType.dataType() == DataTypes.BooleanType)
+					schema.add(ValueType.BOOLEAN);
+				else
+					schema.add(ValueType.STRING);
+			}
+			mb[0] = new FrameBlock(schema);
+			mb[0].setColumnNames(columns);
+		}
+	}
+
+	/**
+	 * 
+	 */
+	private static class BinaryBlockToDataFrameFunction implements FlatMapFunction<Tuple2<Long,FrameBlock>,Row> 
+	{
+		private static final long serialVersionUID = 8093340778966667460L;
+		
+		@Override
+		public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
+			throws Exception 
+		{
+			FrameBlock blk = arg0._2();
+			ArrayList<Row> ret = new ArrayList<Row>();
+
+			//handle Frame block data
+			Iterator<Object[]> iter = blk.getObjectRowIterator();
+			while( iter.hasNext() )
+				ret.add(RowFactory.create(iter.next().clone()));
+				
+			return ret;
+		}
+	}
+	
 	/////////////////////////////////
 	// TEXTCELL-SPECIFIC FUNCTIONS
 	
@@ -808,4 +970,18 @@ public class FrameRDDConverterUtils
 			return ret;
 		}
 	}
+	
+	//////////////////////////////////////
+	// Common functions
+	
+	// Flushes current state of filled column blocks to output list.
+	private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+		throws DMLRuntimeException
+	{
+		int len = ix.length;			
+		for( int i=0; i<len; i++ )
+			if( mb[i] != null ) {
+				ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i]));
+			}	
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index d37bbde..e318fff 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -104,7 +104,7 @@ public abstract class FrameReader
 		throws IOException, DMLRuntimeException
 	{
 		List<String> colNames = new ArrayList<String>();
-		for (int i=0; i < clen; ++i)
+		for (int i=0; i < clen; i++)
 			colNames.add("C"+i);
 		return colNames;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
index dd0b9e0..6ad08d8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
@@ -781,7 +781,7 @@ public class LibMatrixBincell
 			if( ixPos1 >= 0 ){ //match, scan to next val
 				if(bOp.fn instanceof LessThan || bOp.fn instanceof GreaterThanEquals 
 						|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
-					while( ixPos1<bv.length && value==bv[ixPos1]  ) ++ixPos1;
+					while( ixPos1<bv.length && value==bv[ixPos1]  ) ixPos1++;
 				if(bOp.fn instanceof GreaterThan || bOp.fn instanceof LessThanEquals 
 						|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
 					while(  ixPos2 > 0 && value==bv[ixPos2-1]) --ixPos2;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
index dab24a0..25d3067 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
@@ -184,10 +184,10 @@ public class LibMatrixOuterAgg
 			
 			double dvix[] = new double[vix.length];
 			if (bPrimeCumSum)
-				for (int i = 0; i< vix.length; ++i)
+				for (int i = 0; i< vix.length; i++)
 					dvix[vix.length-1-i] = vix[i];
 			else
-				for (int i = 0; i< vix.length; ++i)
+				for (int i = 0; i< vix.length; i++)
 					dvix[i] = vix[i];
 			
 			MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true);
@@ -197,7 +197,7 @@ public class LibMatrixOuterAgg
 			
 			vixCumSum = DataConverter.convertToIntVector(mbResult);  
 			if (bPrimeCumSum)
-				for (int i = 0; i< (vixCumSum.length+1)/2; ++i) {
+				for (int i = 0; i< (vixCumSum.length+1)/2; i++) {
 					int iTemp = vixCumSum[vixCumSum.length-1-i];
 					vixCumSum[vixCumSum.length-1-i] = vixCumSum[i];
 					vixCumSum[i] = iTemp;
@@ -264,10 +264,10 @@ public class LibMatrixOuterAgg
 			
 			double dvix[] = new double[vix.length];
 			if (bPrimeCumSum)
-				for (int i = 0; i< vix.length; ++i)
+				for (int i = 0; i< vix.length; i++)
 					dvix[vix.length-1-i] = vix[i];
 			else
-				for (int i = 0; i< vix.length; ++i)
+				for (int i = 0; i< vix.length; i++)
 					dvix[i] = vix[i];
 			
 			MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true);
@@ -277,7 +277,7 @@ public class LibMatrixOuterAgg
 			
 			vixCumSum = DataConverter.convertToIntVector(mbResult);  
 			if (bPrimeCumSum)
-				for (int i = 0; i< (vixCumSum.length+1)/2; ++i) {
+				for (int i = 0; i< (vixCumSum.length+1)/2; i++) {
 					int iTemp = vixCumSum[vixCumSum.length-1-i];
 					vixCumSum[vixCumSum.length-1-i] = vixCumSum[i];
 					vixCumSum[i] = iTemp;
@@ -1030,7 +1030,7 @@ public class LibMatrixOuterAgg
 			int[] aix = sblock.indexes(j);
 			double [] avals = sblock.values(j);
 			
-			for (int i=apos; i < apos+alen; ++i) {
+			for (int i=apos; i < apos+alen; i++) {
 				int cnt = sumEqNe(avals[i], bv, bOp);
 				out.quickSetValue(0, aix[i], cnt);
 			}
@@ -1447,7 +1447,7 @@ public class LibMatrixOuterAgg
     	} else if(bOp.fn instanceof Equals) {
     		double dFirstValue = vmb[0];
     		int i=0;
-    		while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+    		while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
     		if (i < vmb.length-1) 
     			vix[0] = i+1;
     		else	
@@ -1455,7 +1455,7 @@ public class LibMatrixOuterAgg
     	} else if(bOp.fn instanceof NotEquals) {
     		double dFirstValue = vmb[0];
     		int i=0;
-    		while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+    		while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
     		if (i < vmb.length-1) 
     			vix[0] = i-1;
     		else	
@@ -1520,10 +1520,10 @@ public class LibMatrixOuterAgg
 	{
     	int iCurInd = 0;
 		
-    	for (int i = 0; i < vix.length;++i)
+    	for (int i = 0; i < vix.length;i++)
     	{
     		double dPrevVal = vmb[iCurInd];
-			while(i<vix.length && dPrevVal == vmb[i]) ++i;
+			while(i<vix.length && dPrevVal == vmb[i]) i++;
 			
 			if(i < vix.length) {
 				for(int j=iCurInd; j<i; ++j) vix[j] = vix[i];
@@ -1555,9 +1555,9 @@ public class LibMatrixOuterAgg
 		int iLastIndex = 0;
 		double dLastVal = vix[iLastIndex];
 
-    	for (int i = 0; i < vix.length-1; ++i)
+    	for (int i = 0; i < vix.length-1; i++)
     	{
-    		while(i<vmb.length-1 && dLastVal == vmb[i+1]) ++i;
+    		while(i<vmb.length-1 && dLastVal == vmb[i+1]) i++;
     		for (int j=iLastIndex+1; j<=i; ++j) 
     			vix[j] = vix[iLastIndex];
     		if (i < vix.length-1) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 48b0b2d..842982d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5069,7 +5069,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		// keep scanning the weights, until we hit the required position <code>fromPos</code>
 		while ( count < fromPos ) {
 			count += quickGetValue(index,1);
-			++index;
+			index++;
 		}
 		
 		double runningSum; 
@@ -5086,7 +5086,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			runningSum += (val * Math.min(wt, selectRange-selectedCount));
 			selectedCount += Math.min(wt, selectRange-selectedCount);
 			count += wt;
-			++index;
+			index++;
 		}
 		
 		//System.out.println(fromPos + ", " + toPos + ": " + count + ", "+ runningSum + ", " + selectedCount);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
index 9c66518..c29ef5a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
@@ -108,7 +108,7 @@ extends SequenceFileInputFormat<K,V>
 	    // take N samples from different parts of the input
 	    
 	    int totalcount = 0;
-	    for(int i=0; i < samples; ++i) {
+	    for(int i=0; i < samples; i++) {
 	    	SequenceFileRecordReader reader = 
 	    		(SequenceFileRecordReader) inFormat.getRecordReader(splits[sampleStep * i], conf, null);
 	    	int count=0;
@@ -227,7 +227,7 @@ extends SequenceFileInputFormat<K,V>
 			float stepSize = numRecords / (float) numPartitions;
 			//System.out.println("Step size is " + stepSize);
 			ArrayList<WritableComparable> result = new ArrayList<WritableComparable>(numPartitions-1);
-			for(int i=1; i < numPartitions; ++i) {
+			for(int i=1; i < numPartitions; i++) {
 				result.add(records.get(Math.round(stepSize * i)));
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 08a6e8d..c790ae9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -956,8 +956,8 @@ public class DataConverter
 					}
 				}
 			} else {	// Block is in dense format
-				for (int i=0; i<rowLength; ++i){
-					for (int j=0; j<colLength; ++j){
+				for (int i=0; i<rowLength; i++){
+					for (int j=0; j<colLength; j++){
 						double value = mb.getValue(i, j);
 						if (value != 0.0){
 							sb.append(i+1).append(separator).append(j+1).append(separator);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index fa17fcd..88221f2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,6 +23,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -615,4 +618,45 @@ public class UtilFunctions
 	
 		return in1.getDataType();
 	}
+	
+	/*
+	 * This function will convert Frame schema into DataFrame schema 
+	 * 
+	 *  @param	schema
+	 *  		Frame schema in the form of List<ValueType>
+	 *  @return
+	 *  		Returns the DataFrame schema (StructType)
+	 */
+	public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
+	{
+		// Generate the schema based on the string of schema
+		List<StructField> fields = new ArrayList<StructField>();
+		
+		int i = 1;
+		for (ValueType schema : lschema)
+		{
+			org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
+			switch(schema)
+			{
+				case STRING:
+					dataType = DataTypes.StringType;
+					break;
+				case DOUBLE:
+					dataType = DataTypes.DoubleType;
+					break;
+				case INT:
+					dataType = DataTypes.LongType;
+					break;
+				case BOOLEAN:
+					dataType = DataTypes.BooleanType;
+					break;
+				default:
+					System.out.println("Default schema type is String.");
+			}
+			fields.add(DataTypes.createStructField("C"+i++, dataType, true));
+		}
+		
+		return DataTypes.createStructType(fields);
+	}
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
index 20c4a27..0d3b932 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -214,7 +214,7 @@ public class FrameAppendDistTest extends AutomatedTestBase
 	}
 	
 	private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
-		for ( int i=0; i<frame1.getNumRows(); ++i )
+		for ( int i=0; i<frame1.getNumRows(); i++ )
 			for( int j=0; j<frame1.getNumColumns(); j++ )	{
 				Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
 				Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 7a8392b..441a63b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -27,9 +27,15 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -65,6 +71,7 @@ import org.junit.Test;
 
 
 
+
 public class FrameConverterTest extends AutomatedTestBase
 {
 	private final static String TEST_DIR = "functions/frame/";
@@ -100,8 +107,12 @@ public class FrameConverterTest extends AutomatedTestBase
 		BIN2TXTCELL,
 		MAT2BIN,
 		BIN2MAT,
+		DFRM2BIN,
+		BIN2DFRM,
 	}
 	
+	private static String separator = ",";
+	
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
@@ -178,7 +189,16 @@ public class FrameConverterTest extends AutomatedTestBase
 		runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT);
 	}
 	
-	
+	@Test
+	public void testFrameMixedDFrameBinSpark()  {
+		runFrameConverterTest(schemaMixedLarge, ConvType.DFRM2BIN);
+	}
+		
+	@Test
+	public void testFrameMixedBinDFrameSpark()  {
+		runFrameConverterTest(schemaMixedLarge, ConvType.BIN2DFRM);
+	}
+		
 	/**
 	 * 
 	 * @param schema
@@ -204,7 +224,8 @@ public class FrameConverterTest extends AutomatedTestBase
 			OutputInfo oinfo = null;
 			InputInfo iinfo = null;
 			switch( type ) {
-				case CSV2BIN: 
+				case CSV2BIN:
+				case DFRM2BIN:
 					oinfo = OutputInfo.CSVOutputInfo;
 					iinfo = InputInfo.BinaryBlockInputInfo;
 					break;
@@ -221,6 +242,7 @@ public class FrameConverterTest extends AutomatedTestBase
 					iinfo = InputInfo.TextCellInputInfo;
 					break;
 				case MAT2BIN: 
+				case BIN2DFRM:
 					oinfo = OutputInfo.BinaryBlockOutputInfo;
 					iinfo = InputInfo.BinaryBlockInputInfo;
 					break;
@@ -389,7 +411,7 @@ public class FrameConverterTest extends AutomatedTestBase
 	 * @param frame2
 	 */
 	private void verifyFrameData(FrameBlock frame1, FrameBlock frame2) {
-		for ( int i=0; i<frame1.getNumRows(); ++i )
+		for ( int i=0; i<frame1.getNumRows(); i++ )
 			for( int j=0; j<frame1.getNumColumns(); j++ )	{
 				String val1 = UtilFunctions.objectToString(frame1.get(i, j));
 				String val2 = UtilFunctions.objectToString(frame2.get(i, j));				
@@ -444,7 +466,7 @@ public class FrameConverterTest extends AutomatedTestBase
 				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
 				JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.csvToBinaryBlock(sc, rddIn, mc, false, ",", false, 0)
+						.csvToBinaryBlock(sc, rddIn, mc, false, separator, false, 0)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
 				break;
@@ -494,10 +516,69 @@ public class FrameConverterTest extends AutomatedTestBase
 				rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
 				break;
 			}
+			case DFRM2BIN: {
+				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
+
+				//Create DataFrame 
+				SQLContext sqlContext = new SQLContext(sc);
+				StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
+				JavaRDD<Row> rowRDD = getRowRDD(sc, fnameIn, separator);
+				DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+				
+				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
+						.dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+						.mapToPair(new LongFrameToLongWritableFrameFunction());
+				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+				break;
+			}
+			case BIN2DFRM: {
+				InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
+				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
+				JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
+				JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
+				DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+				
+				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
+				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
+						.dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+						.mapToPair(new LongFrameToLongWritableFrameFunction());
+				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+			
+				break;
+			}
 			default: 
 				throw new RuntimeException("Unsuported converter type: "+type.toString());
 		}
 		
 		sec.close();
 	}
+	
+	/* 
+	 * It will return JavaRDD<Row> based on csv data input file.
+	 */
+	JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String separator)
+	{
+		// Load a text file and convert each line to a java rdd.
+		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
+		return dataRdd.map(new RowGenerator());
+	}
+	
+	/* 
+	 * Row Generator class based on individual line in CSV file.
+	 */
+	private static class RowGenerator implements Function<String,Row> 
+	{
+		private static final long serialVersionUID = -6736256507697511070L;
+
+		@Override
+		public Row call(String record) throws Exception {
+		      String[] fields = record.split(",");
+		      Object[] objects = new Object[fields.length]; 
+		      for (int i=0; i<fields.length; i++) {
+			      objects[i] = fields[i];
+		      }
+		      return RowFactory.create(objects);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
index 9bc8d14..e713a86 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
@@ -172,7 +172,7 @@ public class FrameCopyTest extends AutomatedTestBase
 	void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
 	{
 		List<ValueType> lschema = frame1.getSchema();
-		for ( int i=0; i<frame1.getNumRows(); ++i )
+		for ( int i=0; i<frame1.getNumRows(); i++ )
 			for( int j=0; j<lschema.size(); j++ )	{
 				if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0)
 					Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i,  j) + 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
index b43dc6b..86dee49 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
@@ -268,7 +268,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase
 	}
 	
 	private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
-		for ( int i=0; i<frame1.getNumRows(); ++i )
+		for ( int i=0; i<frame1.getNumRows(); i++ )
 			for( int j=0; j<frame1.getNumColumns(); j++ )	{
 				Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
 				Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
index 7d45ebc..d46c11f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
@@ -215,7 +215,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
 	void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
 	{
 		List<ValueType> lschema = frame1.getSchema();
-		for ( int i=0; i<frame1.getNumRows(); ++i )
+		for ( int i=0; i<frame1.getNumRows(); i++ )
 			for( int j=0; j<lschema.size(); j++ )	{
 				if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0)
 					Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i,  j) +