You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/30 04:00:04 UTC

incubator-systemml git commit: [SYSTEMML-969] Extended dataframe-frame converter (mixed vector schema)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2e5600f38 -> 0cc4d23e5


[SYSTEMML-969] Extended dataframe-frame converter (mixed vector schema)

This patch extends the existing dataframe-frame converter to support
mixed schemas that can also include vectors. We make a simplifying
restriction of a single vector column, but it can occur at arbitrary
positions and mixed with arbitrary scalar fields. The new tests cover a
variety of scenarios with dense/sparse data, w/ and w/o index column,
know/unknown dimensions, and various different schemas.

Furthermore, this patch also makes two minor performance improvements,
namely, (1) avoid unnecessary dataframe-javaRDD conversions (in case of
unknown dimensions), and (2) improved object-object conversion (with
matching schema).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0cc4d23e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0cc4d23e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0cc4d23e

Branch: refs/heads/master
Commit: 0cc4d23e5a0e9e93cf5945bd00c68d918978c87e
Parents: 2e5600f
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Sep 29 15:58:25 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 29 20:58:44 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     |  88 ++++-
 .../sysml/runtime/util/UtilFunctions.java       |   6 +-
 .../mlcontext/DataFrameFrameConversionTest.java | 244 -------------
 .../DataFrameRowFrameConversionTest.java        | 244 +++++++++++++
 .../DataFrameVectorFrameConversionTest.java     | 357 +++++++++++++++++++
 5 files changed, 676 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index f8fb05c..4400a1a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -37,6 +37,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -328,12 +330,15 @@ public class FrameRDDConverterUtils
 	{
 		//determine unknown dimensions if required
 		if( !mc.dimsKnown() ) { //nnz are irrelevant here
-			JavaRDD<Row> tmp = df.javaRDD();
-			long rlen = tmp.count();
-			long clen = df.columns().length - (containsID?1:0);
+			int colVect = getColVectFromDFSchema(df.schema(), containsID);
+			int off = (containsID ? 1 : 0);
+			long rlen = df.count();
+			long clen = df.columns().length - off + ((colVect >= 0) ? 
+					((Vector)df.first().get(off+colVect)).size() - 1 : 0);
 			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
 		}
 		
+		//append or reuse row index column
 		JavaPairRDD<Row, Long> prepinput = containsID ?
 				df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
 				df.javaRDD().zipWithIndex(); //zip row index
@@ -341,11 +346,11 @@ public class FrameRDDConverterUtils
 		//convert data frame to frame schema (prepare once)
 		String[] colnames = new String[(int)mc.getCols()];
 		ValueType[] fschema = new ValueType[(int)mc.getCols()];
-		convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
+		int colVect = convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
 				
 		//convert rdd to binary block rdd
 		return prepinput.mapPartitionsToPair(
-				new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID));
+				new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID, colVect));
 	}
 
 	/**
@@ -413,29 +418,68 @@ public class FrameRDDConverterUtils
 	}
 	
 	/**
+	 * NOTE: regarding the support of vector columns, we make the following 
+	 * schema restriction: single vector column, which allows inference of
+	 * the vector length without data access and covers the common case. 
 	 * 
 	 * @param dfschema
 	 * @param containsID
-	 * @return
+	 * @return 0-based column index of vector column, -1 if no vector. 
 	 */
-	public static void convertDFSchemaToFrameSchema(StructType dfschema, String[] colnames, 
+	public static int convertDFSchemaToFrameSchema(StructType dfschema, String[] colnames, 
 			ValueType[] fschema, boolean containsID)
 	{
+		//basic meta data
 		int off = containsID ? 1 : 0;
-		for( int i=off; i<dfschema.fields().length; i++ ) {
+		boolean containsVect = false;
+		int lenVect = fschema.length - (dfschema.fields().length - off) + 1;
+		int colVect = -1;
+		
+		//process individual columns
+		for( int i=off, pos=0; i<dfschema.fields().length; i++ ) {
 			StructField structType = dfschema.apply(i);
-			colnames[i-off] = structType.name();
+			colnames[pos] = structType.name();
 			if(structType.dataType() == DataTypes.DoubleType 
 				|| structType.dataType() == DataTypes.FloatType)
-				fschema[i-off] = ValueType.DOUBLE;
+				fschema[pos++] = ValueType.DOUBLE;
 			else if(structType.dataType() == DataTypes.LongType 
 				|| structType.dataType() == DataTypes.IntegerType)
-				fschema[i-off] = ValueType.INT;
+				fschema[pos++] = ValueType.INT;
 			else if(structType.dataType() == DataTypes.BooleanType)
-				fschema[i-off] = ValueType.BOOLEAN;
+				fschema[pos++] = ValueType.BOOLEAN;
+			else if(structType.dataType() instanceof VectorUDT) {
+				if( containsVect )
+					throw new RuntimeException("Found invalid second vector column.");
+				String name = colnames[pos];
+				colVect = pos;
+				for( int j=0; j<lenVect; j++ ) {
+					colnames[pos] = name+"v"+j;
+					fschema[pos++] = ValueType.DOUBLE;
+				}
+				containsVect = true;
+			}
 			else
-				fschema[i-off] = ValueType.STRING;
+				fschema[pos++] = ValueType.STRING;
+		}
+		
+		return colVect;
+	}
+	
+	/**
+	 * 
+	 * @param dfschema
+	 * @param containsID
+	 * @return 0-based column index of vector column, -1 if no vector.
+	 */
+	private static int getColVectFromDFSchema(StructType dfschema, boolean containsID) {
+		int off = containsID ? 1 : 0;
+		for( int i=off; i<dfschema.fields().length; i++ ) {
+			StructField structType = dfschema.apply(i);
+			if(structType.dataType() instanceof VectorUDT)
+				return i-off;
 		}
+		
+		return -1;
 	}
 	
 	/* 
@@ -790,14 +834,16 @@ public class FrameRDDConverterUtils
 		private String[] _colnames = null;
 		private ValueType[] _schema = null;
 		private boolean _containsID = false;
+		private int _colVect = -1;
 		private int _maxRowsPerBlock = -1;
 		
 		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, String[] colnames, 
-				ValueType[] schema, boolean containsID) {
+				ValueType[] schema, boolean containsID, int colVect) {
 			_clen = mc.getCols();
 			_colnames = colnames;
 			_schema = schema;
 			_containsID = containsID;
+			_colVect = colVect;
 			_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
 		}
 		
@@ -826,9 +872,17 @@ public class FrameRDDConverterUtils
 				
 				//process row data
 				int off = _containsID ? 1 : 0;
-				for(int i=off; i<row.size(); i++) {
-					tmprow[i-off] = UtilFunctions.objectToObject(
-							_schema[i-off], row.get(i));
+				for(int i=off, pos=0; i<row.size(); i++) {
+					if( i-off == _colVect ) {
+						Vector vect = (Vector) row.get(i);
+						for( int j=0; j<vect.size(); j++ )
+							tmprow[pos++] = vect.apply(j);
+					}
+					else {
+						tmprow[pos] = UtilFunctions.objectToObject(
+							_schema[pos], row.get(i));
+						pos++;
+					}
 				}
 				fb.appendRow(tmprow);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 89472e1..24403d3 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -423,9 +423,11 @@ public class UtilFunctions
 	 * @param in
 	 * @return
 	 */
-	public static Object objectToObject(ValueType vt, Object in ) {
+	public static Object objectToObject(ValueType vt, Object in) {
 		if( in instanceof Double && vt == ValueType.DOUBLE 
-			|| in instanceof Long && vt == ValueType.INT )
+			|| in instanceof Long && vt == ValueType.INT
+			|| in instanceof Boolean && vt == ValueType.BOOLEAN
+			|| in instanceof String && vt == ValueType.STRING )
 			return in; //quick path to avoid double parsing
 		else
 			return stringToObject(vt, objectToString(in) );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
deleted file mode 100644
index ac3035f..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.mlcontext;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-
-public class DataFrameFrameConversionTest extends AutomatedTestBase 
-{
-	private final static String TEST_DIR = "functions/mlcontext/";
-	private final static String TEST_NAME = "DataFrameConversion";
-	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameFrameConversionTest.class.getSimpleName() + "/";
-
-	private final static int  rows1 = 2245;
-	private final static int  cols1 = 745;
-	private final static int  cols2 = 1264;
-	private final static double sparsity1 = 0.9;
-	private final static double sparsity2 = 0.1;
-	private final static double eps=0.0000000001;
-
-	 
-	@Override
-	public void setUp() {
-		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
-	}
-
-	
-
-	@Test
-	public void testRowDoubleConversionSingleDense() {
-		testDataFrameConversion(ValueType.DOUBLE, true, true, false);
-	}
-	
-	@Test
-	public void testRowDoubleConversionSingleDenseUnknown() {
-		testDataFrameConversion(ValueType.DOUBLE, true, true, true);
-	}
-	
-	@Test
-	public void testRowDoubleConversionSingleSparse() {
-		testDataFrameConversion(ValueType.DOUBLE, true, false, false);
-	}
-	
-	@Test
-	public void testRowDoubleConversionSingleSparseUnknown() {
-		testDataFrameConversion(ValueType.DOUBLE, true, false, true);
-	}
-	
-	@Test
-	public void testRowDoubleConversionMultiDense() {
-		testDataFrameConversion(ValueType.DOUBLE, false, true, false);
-	}
-	
-	@Test
-	public void testRowDoubleConversionMultiDenseUnknown() {
-		testDataFrameConversion(ValueType.DOUBLE, false, true, true);
-	}
-	
-	@Test
-	public void testRowDoubleConversionMultiSparse() {
-		testDataFrameConversion(ValueType.DOUBLE, false, false, false);
-	}
-	
-	@Test
-	public void testRowDoubleConversionMultiSparseUnknown() {
-		testDataFrameConversion(ValueType.DOUBLE, false, false, true);
-	}
-
-	@Test
-	public void testRowStringConversionSingleDense() {
-		testDataFrameConversion(ValueType.STRING, true, true, false);
-	}
-	
-	@Test
-	public void testRowStringConversionSingleDenseUnknown() {
-		testDataFrameConversion(ValueType.STRING, true, true, true);
-	}
-	
-	@Test
-	public void testRowStringConversionSingleSparse() {
-		testDataFrameConversion(ValueType.STRING, true, false, false);
-	}
-	
-	@Test
-	public void testRowStringConversionSingleSparseUnknown() {
-		testDataFrameConversion(ValueType.STRING, true, false, true);
-	}
-	
-	@Test
-	public void testRowStringConversionMultiDense() {
-		testDataFrameConversion(ValueType.STRING, false, true, false);
-	}
-	
-	@Test
-	public void testRowStringConversionMultiDenseUnknown() {
-		testDataFrameConversion(ValueType.STRING, false, true, true);
-	}
-	
-	@Test
-	public void testRowStringConversionMultiSparse() {
-		testDataFrameConversion(ValueType.STRING, false, false, false);
-	}
-	
-	@Test
-	public void testRowStringConversionMultiSparseUnknown() {
-		testDataFrameConversion(ValueType.STRING, false, false, true);
-	}
-
-	@Test
-	public void testRowLongConversionSingleDense() {
-		testDataFrameConversion(ValueType.INT, true, true, false);
-	}
-	
-	@Test
-	public void testRowLongConversionSingleDenseUnknown() {
-		testDataFrameConversion(ValueType.INT, true, true, true);
-	}
-	
-	@Test
-	public void testRowLongConversionSingleSparse() {
-		testDataFrameConversion(ValueType.INT, true, false, false);
-	}
-	
-	@Test
-	public void testRowLongConversionSingleSparseUnknown() {
-		testDataFrameConversion(ValueType.INT, true, false, true);
-	}
-	
-	@Test
-	public void testRowLongConversionMultiDense() {
-		testDataFrameConversion(ValueType.INT, false, true, false);
-	}
-	
-	@Test
-	public void testRowLongConversionMultiDenseUnknown() {
-		testDataFrameConversion(ValueType.INT, false, true, true);
-	}
-	
-	@Test
-	public void testRowLongConversionMultiSparse() {
-		testDataFrameConversion(ValueType.INT, false, false, false);
-	}
-	
-	@Test
-	public void testRowLongConversionMultiSparseUnknown() {
-		testDataFrameConversion(ValueType.INT, false, false, true);
-	}
-	
-	/**
-	 * 
-	 * @param vector
-	 * @param singleColBlock
-	 * @param dense
-	 * @param unknownDims
-	 */
-	private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
-		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
-		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
-
-		SparkExecutionContext sec = null;
-		
-		try
-		{
-			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-			
-			//generate input data and setup metadata
-			int cols = singleColBlock ? cols1 : cols2;
-			double sparsity = dense ? sparsity1 : sparsity2; 
-			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
-			A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
-			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
-			FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
-			int blksz = ConfigurationManager.getBlocksize();
-			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
-			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
-			ValueType[] schema = UtilFunctions.nCopies(cols, vt);
-			
-			//setup spark context
-			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
-			JavaSparkContext sc = sec.getSparkContext();
-			SQLContext sqlctx = new SQLContext(sc);
-			
-			//get binary block input rdd
-			JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
-			
-			//frame - dataframe - frame conversion
-			DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
-			JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
-			
-			//get output frame block
-			FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
-			
-			//compare frame blocks
-			MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB); 
-			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
-			TestUtils.compareMatrices(A, B, rows1, cols, eps);
-		}
-		catch( Exception ex ) {
-			throw new RuntimeException(ex);
-		}
-		finally {
-			sec.close();
-			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
-			DMLScript.rtplatform = oldPlatform;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
new file mode 100644
index 0000000..340eb4c
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameRowFrameConversionTest extends AutomatedTestBase 
+{
+	private final static String TEST_DIR = "functions/mlcontext/";
+	private final static String TEST_NAME = "DataFrameConversion";
+	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameRowFrameConversionTest.class.getSimpleName() + "/";
+
+	private final static int  rows1 = 2245;
+	private final static int  cols1 = 745;
+	private final static int  cols2 = 1264;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;
+	private final static double eps=0.0000000001;
+
+	 
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+	}
+
+	
+
+	@Test
+	public void testRowDoubleConversionSingleDense() {
+		testDataFrameConversion(ValueType.DOUBLE, true, true, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, true, true, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleSparse() {
+		testDataFrameConversion(ValueType.DOUBLE, true, false, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, true, false, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiDense() {
+		testDataFrameConversion(ValueType.DOUBLE, false, true, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, false, true, true);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiSparse() {
+		testDataFrameConversion(ValueType.DOUBLE, false, false, false);
+	}
+	
+	@Test
+	public void testRowDoubleConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.DOUBLE, false, false, true);
+	}
+
+	@Test
+	public void testRowStringConversionSingleDense() {
+		testDataFrameConversion(ValueType.STRING, true, true, false);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.STRING, true, true, true);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleSparse() {
+		testDataFrameConversion(ValueType.STRING, true, false, false);
+	}
+	
+	@Test
+	public void testRowStringConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.STRING, true, false, true);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiDense() {
+		testDataFrameConversion(ValueType.STRING, false, true, false);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.STRING, false, true, true);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiSparse() {
+		testDataFrameConversion(ValueType.STRING, false, false, false);
+	}
+	
+	@Test
+	public void testRowStringConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.STRING, false, false, true);
+	}
+
+	@Test
+	public void testRowLongConversionSingleDense() {
+		testDataFrameConversion(ValueType.INT, true, true, false);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleDenseUnknown() {
+		testDataFrameConversion(ValueType.INT, true, true, true);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleSparse() {
+		testDataFrameConversion(ValueType.INT, true, false, false);
+	}
+	
+	@Test
+	public void testRowLongConversionSingleSparseUnknown() {
+		testDataFrameConversion(ValueType.INT, true, false, true);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiDense() {
+		testDataFrameConversion(ValueType.INT, false, true, false);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiDenseUnknown() {
+		testDataFrameConversion(ValueType.INT, false, true, true);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiSparse() {
+		testDataFrameConversion(ValueType.INT, false, false, false);
+	}
+	
+	@Test
+	public void testRowLongConversionMultiSparseUnknown() {
+		testDataFrameConversion(ValueType.INT, false, false, true);
+	}
+	
+	/**
+	 * 
+	 * @param vector
+	 * @param singleColBlock
+	 * @param dense
+	 * @param unknownDims
+	 */
+	private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
+		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+		SparkExecutionContext sec = null;
+		
+		try
+		{
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+			
+			//generate input data and setup metadata
+			int cols = singleColBlock ? cols1 : cols2;
+			double sparsity = dense ? sparsity1 : sparsity2; 
+			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
+			A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
+			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
+			FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+			ValueType[] schema = UtilFunctions.nCopies(cols, vt);
+			
+			//setup spark context
+			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
+			JavaSparkContext sc = sec.getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			//get binary block input rdd
+			JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
+			
+			//frame - dataframe - frame conversion
+			DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+			JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
+			
+			//get output frame block
+			FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
+			
+			//compare frame blocks
+			MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB); 
+			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+		}
+		catch( Exception ex ) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			sec.close();
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+			DMLScript.rtplatform = oldPlatform;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
new file mode 100644
index 0000000..20287c7
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.DenseVector;
+import org.apache.spark.mllib.linalg.VectorUDT;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameVectorFrameConversionTest extends AutomatedTestBase 
+{
+	private final static String TEST_DIR = "functions/mlcontext/";
+	private final static String TEST_NAME = "DataFrameConversion";
+	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameVectorFrameConversionTest.class.getSimpleName() + "/";
+
+	//schema restriction: single vector included
+	private final static ValueType[] schemaStrings = new ValueType[]{ValueType.OBJECT, ValueType.STRING, ValueType.STRING, ValueType.STRING};
+	private final static ValueType[] schemaDoubles = new ValueType[]{ValueType.DOUBLE, ValueType.DOUBLE, ValueType.OBJECT, ValueType.DOUBLE};
+	private final static ValueType[] schemaMixed1 = new ValueType[]{ValueType.OBJECT, ValueType.INT, ValueType.STRING, ValueType.DOUBLE, ValueType.INT};
+	private final static ValueType[] schemaMixed2 = new ValueType[]{ValueType.STRING, ValueType.OBJECT, ValueType.DOUBLE};
+	
+	private final static int rows1 = 2245;
+	private final static int colsVector = 7;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;
+	private final static double eps=0.0000000001;
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+	}
+
+	@Test
+	public void testVectorStringsConversionIDDenseUnknown() {
+		testDataFrameConversion(schemaStrings, true, false, true);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionIDDenseUnknown() {
+		testDataFrameConversion(schemaDoubles, true, false, true);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionIDDenseUnknown() {
+		testDataFrameConversion(schemaMixed1, true, false, true);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionIDDenseUnknown() {
+		testDataFrameConversion(schemaMixed2, true, false, true);
+	}
+	
+	@Test
+	public void testVectorStringsConversionIDDense() {
+		testDataFrameConversion(schemaStrings, true, false, false);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionIDDense() {
+		testDataFrameConversion(schemaDoubles, true, false, false);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionIDDense() {
+		testDataFrameConversion(schemaMixed1, true, false, false);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionIDDense() {
+		testDataFrameConversion(schemaMixed2, true, false, false);
+	}
+
+	@Test
+	public void testVectorStringsConversionIDSparseUnknown() {
+		testDataFrameConversion(schemaStrings, true, true, true);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionIDSparseUnknown() {
+		testDataFrameConversion(schemaDoubles, true, true, true);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionIDSparseUnknown() {
+		testDataFrameConversion(schemaMixed1, true, true, true);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionIDSparseUnknown() {
+		testDataFrameConversion(schemaMixed2, true, true, true);
+	}
+	
+	@Test
+	public void testVectorStringsConversionIDSparse() {
+		testDataFrameConversion(schemaStrings, true, true, false);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionIDSparse() {
+		testDataFrameConversion(schemaDoubles, true, true, false);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionIDSparse() {
+		testDataFrameConversion(schemaMixed1, true, true, false);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionIDSparse() {
+		testDataFrameConversion(schemaMixed2, true, true, false);
+	}
+
+	@Test
+	public void testVectorStringsConversionDenseUnknown() {
+		testDataFrameConversion(schemaStrings, false, false, true);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionDenseUnknown() {
+		testDataFrameConversion(schemaDoubles, false, false, true);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionDenseUnknown() {
+		testDataFrameConversion(schemaMixed1, false, false, true);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionDenseUnknown() {
+		testDataFrameConversion(schemaMixed2, false, false, true);
+	}
+	
+	@Test
+	public void testVectorStringsConversionDense() {
+		testDataFrameConversion(schemaStrings, false, false, false);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionDense() {
+		testDataFrameConversion(schemaDoubles, false, false, false);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionDense() {
+		testDataFrameConversion(schemaMixed1, false, false, false);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionDense() {
+		testDataFrameConversion(schemaMixed2, false, false, false);
+	}
+
+	@Test
+	public void testVectorStringsConversionSparseUnknown() {
+		testDataFrameConversion(schemaStrings, false, true, true);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionSparseUnknown() {
+		testDataFrameConversion(schemaDoubles, false, true, true);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionSparseUnknown() {
+		testDataFrameConversion(schemaMixed1, false, true, true);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionSparseUnknown() {
+		testDataFrameConversion(schemaMixed2, false, true, true);
+	}
+	
+	@Test
+	public void testVectorStringsConversionSparse() {
+		testDataFrameConversion(schemaStrings, false, true, false);
+	}
+	
+	@Test
+	public void testVectorDoublesConversionSparse() {
+		testDataFrameConversion(schemaDoubles, false, true, false);
+	}
+	
+	@Test
+	public void testVectorMixed1ConversionSparse() {
+		testDataFrameConversion(schemaMixed1, false, true, false);
+	}
+	
+	@Test
+	public void testVectorMixed2ConversionSparse() {
+		testDataFrameConversion(schemaMixed2, false, true, false);
+	}
+	
+	/**
+	 * 
+	 * @param vector
+	 * @param singleColBlock
+	 * @param dense
+	 * @param unknownDims
+	 */
+	private void testDataFrameConversion(ValueType[] schema, boolean containsID, boolean dense, boolean unknownDims) {
+		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+		SparkExecutionContext sec = null;
+		
+		try
+		{
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+			
+			//generate input data and setup metadata
+			int cols = schema.length + colsVector - 1;
+			double sparsity = dense ? sparsity1 : sparsity2; 
+			double[][] A = TestUtils.round(getRandomMatrix(rows1, cols, -10, 1000, sparsity, 2373)); 
+			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+			
+			//setup spark context
+			sec = (SparkExecutionContext) ExecutionContextFactory.createContext();		
+			JavaSparkContext sc = sec.getSparkContext();
+			SQLContext sqlctx = new SQLContext(sc);
+			
+			//create input data frame
+			DataFrame df = createDataFrame(sqlctx, mbA, containsID, schema);
+			
+			//dataframe - frame conversion
+			JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID);
+			
+			//get output frame block
+			FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, 
+					UtilFunctions.nCopies(cols, ValueType.DOUBLE), rows1, cols);
+			
+			//compare frame blocks
+			MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB); 
+			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+		}
+		catch( Exception ex ) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			sec.close();
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+			DMLScript.rtplatform = oldPlatform;
+		}
+	}
+	
+	/**
+	 * 
+	 * @param sqlctx
+	 * @param mb
+	 * @param schema
+	 * @return
+	 * @throws DMLRuntimeException 
+	 */
+	@SuppressWarnings("resource")
+	private DataFrame createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema) 
+		throws DMLRuntimeException
+	{
+		//create in-memory list of rows
+		List<Row> list = new ArrayList<Row>();		 
+		int off = (containsID ? 1 : 0);
+		int clen = mb.getNumColumns() + off - colsVector + 1;
+		
+		for( int i=0; i<mb.getNumRows(); i++ ) {
+			Object[] row = new Object[clen];
+			if( containsID )
+				row[0] = i+1;
+			for( int j=0, j2=0; j<mb.getNumColumns(); j++, j2++ ) {
+				if( schema[j2] != ValueType.OBJECT ) {
+					row[j2+off] = UtilFunctions
+						.doubleToObject(schema[j2], mb.quickGetValue(i, j));
+				}
+				else {
+					double[] tmp = DataConverter.convertToDoubleVector(
+							mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()));
+					row[j2+off] = new DenseVector(tmp);
+					j += colsVector-1;
+				}
+			}
+			list.add(RowFactory.create(row));
+		}
+		
+		//create data frame schema
+		List<StructField> fields = new ArrayList<StructField>();
+		if( containsID )
+			fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, 
+					DataTypes.DoubleType, true));
+		for( int j=0; j<schema.length; j++ ) {
+			DataType dt = null;
+			switch(schema[j]) {
+				case STRING: dt = DataTypes.StringType; break;
+				case DOUBLE: dt = DataTypes.DoubleType; break;
+				case INT:    dt = DataTypes.LongType; break;
+				case OBJECT: dt = new VectorUDT(); break;
+				default: throw new RuntimeException("Unsupported value type.");
+			}
+			fields.add(DataTypes.createStructField("C"+(j+1), dt, true));
+		}
+		StructType dfSchema = DataTypes.createStructType(fields);
+				
+		//create rdd and data frame
+		JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext());
+		JavaRDD<Row> rowRDD = sc.parallelize(list);
+		return sqlctx.createDataFrame(rowRDD, dfSchema);
+	}
+}
\ No newline at end of file