You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/30 04:00:04 UTC
incubator-systemml git commit: [SYSTEMML-969] Extended
dataframe-frame converter (mixed vector schema)
Repository: incubator-systemml
Updated Branches:
refs/heads/master 2e5600f38 -> 0cc4d23e5
[SYSTEMML-969] Extended dataframe-frame converter (mixed vector schema)
This patch extends the existing dataframe-frame converter to support
mixed schemas that can also include vectors. We make a simplifying
restriction of a single vector column, but it can occur at arbitrary
positions and mixed with arbitrary scalar fields. The new tests cover a
variety of scenarios with dense/sparse data, w/ and w/o index column,
know/unknown dimensions, and various different schemas.
Furthermore, this patch also makes two minor performance improvements,
namely, (1) avoid unnecessary dataframe-javaRDD conversions (in case of
unknown dimensions), and (2) improved object-object conversion (with
matching schema).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0cc4d23e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0cc4d23e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0cc4d23e
Branch: refs/heads/master
Commit: 0cc4d23e5a0e9e93cf5945bd00c68d918978c87e
Parents: 2e5600f
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Sep 29 15:58:25 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 29 20:58:44 2016 -0700
----------------------------------------------------------------------
.../spark/utils/FrameRDDConverterUtils.java | 88 ++++-
.../sysml/runtime/util/UtilFunctions.java | 6 +-
.../mlcontext/DataFrameFrameConversionTest.java | 244 -------------
.../DataFrameRowFrameConversionTest.java | 244 +++++++++++++
.../DataFrameVectorFrameConversionTest.java | 357 +++++++++++++++++++
5 files changed, 676 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index f8fb05c..4400a1a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -37,6 +37,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -328,12 +330,15 @@ public class FrameRDDConverterUtils
{
//determine unknown dimensions if required
if( !mc.dimsKnown() ) { //nnz are irrelevant here
- JavaRDD<Row> tmp = df.javaRDD();
- long rlen = tmp.count();
- long clen = df.columns().length - (containsID?1:0);
+ int colVect = getColVectFromDFSchema(df.schema(), containsID);
+ int off = (containsID ? 1 : 0);
+ long rlen = df.count();
+ long clen = df.columns().length - off + ((colVect >= 0) ?
+ ((Vector)df.first().get(off+colVect)).size() - 1 : 0);
mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
}
+ //append or reuse row index column
JavaPairRDD<Row, Long> prepinput = containsID ?
df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
df.javaRDD().zipWithIndex(); //zip row index
@@ -341,11 +346,11 @@ public class FrameRDDConverterUtils
//convert data frame to frame schema (prepare once)
String[] colnames = new String[(int)mc.getCols()];
ValueType[] fschema = new ValueType[(int)mc.getCols()];
- convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
+ int colVect = convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
//convert rdd to binary block rdd
return prepinput.mapPartitionsToPair(
- new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID));
+ new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID, colVect));
}
/**
@@ -413,29 +418,68 @@ public class FrameRDDConverterUtils
}
/**
+ * NOTE: regarding the support of vector columns, we make the following
+ * schema restriction: single vector column, which allows inference of
+ * the vector length without data access and covers the common case.
*
* @param dfschema
* @param containsID
- * @return
+ * @return 0-based column index of vector column, -1 if no vector.
*/
- public static void convertDFSchemaToFrameSchema(StructType dfschema, String[] colnames,
+ public static int convertDFSchemaToFrameSchema(StructType dfschema, String[] colnames,
ValueType[] fschema, boolean containsID)
{
+ //basic meta data
int off = containsID ? 1 : 0;
- for( int i=off; i<dfschema.fields().length; i++ ) {
+ boolean containsVect = false;
+ int lenVect = fschema.length - (dfschema.fields().length - off) + 1;
+ int colVect = -1;
+
+ //process individual columns
+ for( int i=off, pos=0; i<dfschema.fields().length; i++ ) {
StructField structType = dfschema.apply(i);
- colnames[i-off] = structType.name();
+ colnames[pos] = structType.name();
if(structType.dataType() == DataTypes.DoubleType
|| structType.dataType() == DataTypes.FloatType)
- fschema[i-off] = ValueType.DOUBLE;
+ fschema[pos++] = ValueType.DOUBLE;
else if(structType.dataType() == DataTypes.LongType
|| structType.dataType() == DataTypes.IntegerType)
- fschema[i-off] = ValueType.INT;
+ fschema[pos++] = ValueType.INT;
else if(structType.dataType() == DataTypes.BooleanType)
- fschema[i-off] = ValueType.BOOLEAN;
+ fschema[pos++] = ValueType.BOOLEAN;
+ else if(structType.dataType() instanceof VectorUDT) {
+ if( containsVect )
+ throw new RuntimeException("Found invalid second vector column.");
+ String name = colnames[pos];
+ colVect = pos;
+ for( int j=0; j<lenVect; j++ ) {
+ colnames[pos] = name+"v"+j;
+ fschema[pos++] = ValueType.DOUBLE;
+ }
+ containsVect = true;
+ }
else
- fschema[i-off] = ValueType.STRING;
+ fschema[pos++] = ValueType.STRING;
+ }
+
+ return colVect;
+ }
+
+ /**
+ *
+ * @param dfschema
+ * @param containsID
+ * @return 0-based column index of vector column, -1 if no vector.
+ */
+ private static int getColVectFromDFSchema(StructType dfschema, boolean containsID) {
+ int off = containsID ? 1 : 0;
+ for( int i=off; i<dfschema.fields().length; i++ ) {
+ StructField structType = dfschema.apply(i);
+ if(structType.dataType() instanceof VectorUDT)
+ return i-off;
}
+
+ return -1;
}
/*
@@ -790,14 +834,16 @@ public class FrameRDDConverterUtils
private String[] _colnames = null;
private ValueType[] _schema = null;
private boolean _containsID = false;
+ private int _colVect = -1;
private int _maxRowsPerBlock = -1;
public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, String[] colnames,
- ValueType[] schema, boolean containsID) {
+ ValueType[] schema, boolean containsID, int colVect) {
_clen = mc.getCols();
_colnames = colnames;
_schema = schema;
_containsID = containsID;
+ _colVect = colVect;
_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
}
@@ -826,9 +872,17 @@ public class FrameRDDConverterUtils
//process row data
int off = _containsID ? 1 : 0;
- for(int i=off; i<row.size(); i++) {
- tmprow[i-off] = UtilFunctions.objectToObject(
- _schema[i-off], row.get(i));
+ for(int i=off, pos=0; i<row.size(); i++) {
+ if( i-off == _colVect ) {
+ Vector vect = (Vector) row.get(i);
+ for( int j=0; j<vect.size(); j++ )
+ tmprow[pos++] = vect.apply(j);
+ }
+ else {
+ tmprow[pos] = UtilFunctions.objectToObject(
+ _schema[pos], row.get(i));
+ pos++;
+ }
}
fb.appendRow(tmprow);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 89472e1..24403d3 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -423,9 +423,11 @@ public class UtilFunctions
* @param in
* @return
*/
- public static Object objectToObject(ValueType vt, Object in ) {
+ public static Object objectToObject(ValueType vt, Object in) {
if( in instanceof Double && vt == ValueType.DOUBLE
- || in instanceof Long && vt == ValueType.INT )
+ || in instanceof Long && vt == ValueType.INT
+ || in instanceof Boolean && vt == ValueType.BOOLEAN
+ || in instanceof String && vt == ValueType.STRING )
return in; //quick path to avoid double parsing
else
return stringToObject(vt, objectToString(in) );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
deleted file mode 100644
index ac3035f..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.mlcontext;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-
-public class DataFrameFrameConversionTest extends AutomatedTestBase
-{
- private final static String TEST_DIR = "functions/mlcontext/";
- private final static String TEST_NAME = "DataFrameConversion";
- private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameFrameConversionTest.class.getSimpleName() + "/";
-
- private final static int rows1 = 2245;
- private final static int cols1 = 745;
- private final static int cols2 = 1264;
- private final static double sparsity1 = 0.9;
- private final static double sparsity2 = 0.1;
- private final static double eps=0.0000000001;
-
-
- @Override
- public void setUp() {
- addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
- }
-
-
-
- @Test
- public void testRowDoubleConversionSingleDense() {
- testDataFrameConversion(ValueType.DOUBLE, true, true, false);
- }
-
- @Test
- public void testRowDoubleConversionSingleDenseUnknown() {
- testDataFrameConversion(ValueType.DOUBLE, true, true, true);
- }
-
- @Test
- public void testRowDoubleConversionSingleSparse() {
- testDataFrameConversion(ValueType.DOUBLE, true, false, false);
- }
-
- @Test
- public void testRowDoubleConversionSingleSparseUnknown() {
- testDataFrameConversion(ValueType.DOUBLE, true, false, true);
- }
-
- @Test
- public void testRowDoubleConversionMultiDense() {
- testDataFrameConversion(ValueType.DOUBLE, false, true, false);
- }
-
- @Test
- public void testRowDoubleConversionMultiDenseUnknown() {
- testDataFrameConversion(ValueType.DOUBLE, false, true, true);
- }
-
- @Test
- public void testRowDoubleConversionMultiSparse() {
- testDataFrameConversion(ValueType.DOUBLE, false, false, false);
- }
-
- @Test
- public void testRowDoubleConversionMultiSparseUnknown() {
- testDataFrameConversion(ValueType.DOUBLE, false, false, true);
- }
-
- @Test
- public void testRowStringConversionSingleDense() {
- testDataFrameConversion(ValueType.STRING, true, true, false);
- }
-
- @Test
- public void testRowStringConversionSingleDenseUnknown() {
- testDataFrameConversion(ValueType.STRING, true, true, true);
- }
-
- @Test
- public void testRowStringConversionSingleSparse() {
- testDataFrameConversion(ValueType.STRING, true, false, false);
- }
-
- @Test
- public void testRowStringConversionSingleSparseUnknown() {
- testDataFrameConversion(ValueType.STRING, true, false, true);
- }
-
- @Test
- public void testRowStringConversionMultiDense() {
- testDataFrameConversion(ValueType.STRING, false, true, false);
- }
-
- @Test
- public void testRowStringConversionMultiDenseUnknown() {
- testDataFrameConversion(ValueType.STRING, false, true, true);
- }
-
- @Test
- public void testRowStringConversionMultiSparse() {
- testDataFrameConversion(ValueType.STRING, false, false, false);
- }
-
- @Test
- public void testRowStringConversionMultiSparseUnknown() {
- testDataFrameConversion(ValueType.STRING, false, false, true);
- }
-
- @Test
- public void testRowLongConversionSingleDense() {
- testDataFrameConversion(ValueType.INT, true, true, false);
- }
-
- @Test
- public void testRowLongConversionSingleDenseUnknown() {
- testDataFrameConversion(ValueType.INT, true, true, true);
- }
-
- @Test
- public void testRowLongConversionSingleSparse() {
- testDataFrameConversion(ValueType.INT, true, false, false);
- }
-
- @Test
- public void testRowLongConversionSingleSparseUnknown() {
- testDataFrameConversion(ValueType.INT, true, false, true);
- }
-
- @Test
- public void testRowLongConversionMultiDense() {
- testDataFrameConversion(ValueType.INT, false, true, false);
- }
-
- @Test
- public void testRowLongConversionMultiDenseUnknown() {
- testDataFrameConversion(ValueType.INT, false, true, true);
- }
-
- @Test
- public void testRowLongConversionMultiSparse() {
- testDataFrameConversion(ValueType.INT, false, false, false);
- }
-
- @Test
- public void testRowLongConversionMultiSparseUnknown() {
- testDataFrameConversion(ValueType.INT, false, false, true);
- }
-
- /**
- *
- * @param vector
- * @param singleColBlock
- * @param dense
- * @param unknownDims
- */
- private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
- boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
- RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
-
- SparkExecutionContext sec = null;
-
- try
- {
- DMLScript.USE_LOCAL_SPARK_CONFIG = true;
- DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-
- //generate input data and setup metadata
- int cols = singleColBlock ? cols1 : cols2;
- double sparsity = dense ? sparsity1 : sparsity2;
- double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
- A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
- MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
- FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
- MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
- ValueType[] schema = UtilFunctions.nCopies(cols, vt);
-
- //setup spark context
- sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
- JavaSparkContext sc = sec.getSparkContext();
- SQLContext sqlctx = new SQLContext(sc);
-
- //get binary block input rdd
- JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
-
- //frame - dataframe - frame conversion
- DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
- JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
-
- //get output frame block
- FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
-
- //compare frame blocks
- MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB);
- double[][] B = DataConverter.convertToDoubleMatrix(mbB);
- TestUtils.compareMatrices(A, B, rows1, cols, eps);
- }
- catch( Exception ex ) {
- throw new RuntimeException(ex);
- }
- finally {
- sec.close();
- DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
- DMLScript.rtplatform = oldPlatform;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
new file mode 100644
index 0000000..340eb4c
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameRowFrameConversionTest extends AutomatedTestBase
+{
+ private final static String TEST_DIR = "functions/mlcontext/";
+ private final static String TEST_NAME = "DataFrameConversion";
+ private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameRowFrameConversionTest.class.getSimpleName() + "/";
+
+ private final static int rows1 = 2245;
+ private final static int cols1 = 745;
+ private final static int cols2 = 1264;
+ private final static double sparsity1 = 0.9;
+ private final static double sparsity2 = 0.1;
+ private final static double eps=0.0000000001;
+
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+ }
+
+
+
+ @Test
+ public void testRowDoubleConversionSingleDense() {
+ testDataFrameConversion(ValueType.DOUBLE, true, true, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, true, true, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleSparse() {
+ testDataFrameConversion(ValueType.DOUBLE, true, false, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, true, false, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiDense() {
+ testDataFrameConversion(ValueType.DOUBLE, false, true, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, false, true, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiSparse() {
+ testDataFrameConversion(ValueType.DOUBLE, false, false, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, false, false, true);
+ }
+
+ @Test
+ public void testRowStringConversionSingleDense() {
+ testDataFrameConversion(ValueType.STRING, true, true, false);
+ }
+
+ @Test
+ public void testRowStringConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.STRING, true, true, true);
+ }
+
+ @Test
+ public void testRowStringConversionSingleSparse() {
+ testDataFrameConversion(ValueType.STRING, true, false, false);
+ }
+
+ @Test
+ public void testRowStringConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.STRING, true, false, true);
+ }
+
+ @Test
+ public void testRowStringConversionMultiDense() {
+ testDataFrameConversion(ValueType.STRING, false, true, false);
+ }
+
+ @Test
+ public void testRowStringConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.STRING, false, true, true);
+ }
+
+ @Test
+ public void testRowStringConversionMultiSparse() {
+ testDataFrameConversion(ValueType.STRING, false, false, false);
+ }
+
+ @Test
+ public void testRowStringConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.STRING, false, false, true);
+ }
+
+ @Test
+ public void testRowLongConversionSingleDense() {
+ testDataFrameConversion(ValueType.INT, true, true, false);
+ }
+
+ @Test
+ public void testRowLongConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.INT, true, true, true);
+ }
+
+ @Test
+ public void testRowLongConversionSingleSparse() {
+ testDataFrameConversion(ValueType.INT, true, false, false);
+ }
+
+ @Test
+ public void testRowLongConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.INT, true, false, true);
+ }
+
+ @Test
+ public void testRowLongConversionMultiDense() {
+ testDataFrameConversion(ValueType.INT, false, true, false);
+ }
+
+ @Test
+ public void testRowLongConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.INT, false, true, true);
+ }
+
+ @Test
+ public void testRowLongConversionMultiSparse() {
+ testDataFrameConversion(ValueType.INT, false, false, false);
+ }
+
+ @Test
+ public void testRowLongConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.INT, false, false, true);
+ }
+
+ /**
+ *
+ * @param vector
+ * @param singleColBlock
+ * @param dense
+ * @param unknownDims
+ */
+ private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
+ boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+ SparkExecutionContext sec = null;
+
+ try
+ {
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+
+ //generate input data and setup metadata
+ int cols = singleColBlock ? cols1 : cols2;
+ double sparsity = dense ? sparsity1 : sparsity2;
+ double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
+ A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
+ MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
+ FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
+ int blksz = ConfigurationManager.getBlocksize();
+ MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+ MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+ ValueType[] schema = UtilFunctions.nCopies(cols, vt);
+
+ //setup spark context
+ sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
+ JavaSparkContext sc = sec.getSparkContext();
+ SQLContext sqlctx = new SQLContext(sc);
+
+ //get binary block input rdd
+ JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
+
+ //frame - dataframe - frame conversion
+ DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+ JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
+
+ //get output frame block
+ FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
+
+ //compare frame blocks
+ MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB);
+ double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+ TestUtils.compareMatrices(A, B, rows1, cols, eps);
+ }
+ catch( Exception ex ) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ sec.close();
+ DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+ DMLScript.rtplatform = oldPlatform;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0cc4d23e/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
new file mode 100644
index 0000000..20287c7
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.DenseVector;
+import org.apache.spark.mllib.linalg.VectorUDT;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
+{
+ private final static String TEST_DIR = "functions/mlcontext/";
+ private final static String TEST_NAME = "DataFrameConversion";
+ private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameVectorFrameConversionTest.class.getSimpleName() + "/";
+
+ //schema restriction: single vector included
+ private final static ValueType[] schemaStrings = new ValueType[]{ValueType.OBJECT, ValueType.STRING, ValueType.STRING, ValueType.STRING};
+ private final static ValueType[] schemaDoubles = new ValueType[]{ValueType.DOUBLE, ValueType.DOUBLE, ValueType.OBJECT, ValueType.DOUBLE};
+ private final static ValueType[] schemaMixed1 = new ValueType[]{ValueType.OBJECT, ValueType.INT, ValueType.STRING, ValueType.DOUBLE, ValueType.INT};
+ private final static ValueType[] schemaMixed2 = new ValueType[]{ValueType.STRING, ValueType.OBJECT, ValueType.DOUBLE};
+
+ private final static int rows1 = 2245;
+ private final static int colsVector = 7;
+ private final static double sparsity1 = 0.9;
+ private final static double sparsity2 = 0.1;
+ private final static double eps=0.0000000001;
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+ }
+
+ @Test
+ public void testVectorStringsConversionIDDenseUnknown() {
+ testDataFrameConversion(schemaStrings, true, false, true);
+ }
+
+ @Test
+ public void testVectorDoublesConversionIDDenseUnknown() {
+ testDataFrameConversion(schemaDoubles, true, false, true);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionIDDenseUnknown() {
+ testDataFrameConversion(schemaMixed1, true, false, true);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionIDDenseUnknown() {
+ testDataFrameConversion(schemaMixed2, true, false, true);
+ }
+
+ @Test
+ public void testVectorStringsConversionIDDense() {
+ testDataFrameConversion(schemaStrings, true, false, false);
+ }
+
+ @Test
+ public void testVectorDoublesConversionIDDense() {
+ testDataFrameConversion(schemaDoubles, true, false, false);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionIDDense() {
+ testDataFrameConversion(schemaMixed1, true, false, false);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionIDDense() {
+ testDataFrameConversion(schemaMixed2, true, false, false);
+ }
+
+ @Test
+ public void testVectorStringsConversionIDSparseUnknown() {
+ testDataFrameConversion(schemaStrings, true, true, true);
+ }
+
+ @Test
+ public void testVectorDoublesConversionIDSparseUnknown() {
+ testDataFrameConversion(schemaDoubles, true, true, true);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionIDSparseUnknown() {
+ testDataFrameConversion(schemaMixed1, true, true, true);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionIDSparseUnknown() {
+ testDataFrameConversion(schemaMixed2, true, true, true);
+ }
+
+ @Test
+ public void testVectorStringsConversionIDSparse() {
+ testDataFrameConversion(schemaStrings, true, true, false);
+ }
+
+ @Test
+ public void testVectorDoublesConversionIDSparse() {
+ testDataFrameConversion(schemaDoubles, true, true, false);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionIDSparse() {
+ testDataFrameConversion(schemaMixed1, true, true, false);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionIDSparse() {
+ testDataFrameConversion(schemaMixed2, true, true, false);
+ }
+
+ @Test
+ public void testVectorStringsConversionDenseUnknown() {
+ testDataFrameConversion(schemaStrings, false, false, true);
+ }
+
+ @Test
+ public void testVectorDoublesConversionDenseUnknown() {
+ testDataFrameConversion(schemaDoubles, false, false, true);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionDenseUnknown() {
+ testDataFrameConversion(schemaMixed1, false, false, true);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionDenseUnknown() {
+ testDataFrameConversion(schemaMixed2, false, false, true);
+ }
+
+ @Test
+ public void testVectorStringsConversionDense() {
+ testDataFrameConversion(schemaStrings, false, false, false);
+ }
+
+ @Test
+ public void testVectorDoublesConversionDense() {
+ testDataFrameConversion(schemaDoubles, false, false, false);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionDense() {
+ testDataFrameConversion(schemaMixed1, false, false, false);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionDense() {
+ testDataFrameConversion(schemaMixed2, false, false, false);
+ }
+
+ @Test
+ public void testVectorStringsConversionSparseUnknown() {
+ testDataFrameConversion(schemaStrings, false, true, true);
+ }
+
+ @Test
+ public void testVectorDoublesConversionSparseUnknown() {
+ testDataFrameConversion(schemaDoubles, false, true, true);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionSparseUnknown() {
+ testDataFrameConversion(schemaMixed1, false, true, true);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionSparseUnknown() {
+ testDataFrameConversion(schemaMixed2, false, true, true);
+ }
+
+ @Test
+ public void testVectorStringsConversionSparse() {
+ testDataFrameConversion(schemaStrings, false, true, false);
+ }
+
+ @Test
+ public void testVectorDoublesConversionSparse() {
+ testDataFrameConversion(schemaDoubles, false, true, false);
+ }
+
+ @Test
+ public void testVectorMixed1ConversionSparse() {
+ testDataFrameConversion(schemaMixed1, false, true, false);
+ }
+
+ @Test
+ public void testVectorMixed2ConversionSparse() {
+ testDataFrameConversion(schemaMixed2, false, true, false);
+ }
+
+ /**
+ *
+ * @param vector
+ * @param singleColBlock
+ * @param dense
+ * @param unknownDims
+ */
+ private void testDataFrameConversion(ValueType[] schema, boolean containsID, boolean dense, boolean unknownDims) {
+ boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+ SparkExecutionContext sec = null;
+
+ try
+ {
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+
+ //generate input data and setup metadata
+ int cols = schema.length + colsVector - 1;
+ double sparsity = dense ? sparsity1 : sparsity2;
+ double[][] A = TestUtils.round(getRandomMatrix(rows1, cols, -10, 1000, sparsity, 2373));
+ MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
+ int blksz = ConfigurationManager.getBlocksize();
+ MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+ MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+
+ //setup spark context
+ sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
+ JavaSparkContext sc = sec.getSparkContext();
+ SQLContext sqlctx = new SQLContext(sc);
+
+ //create input data frame
+ DataFrame df = createDataFrame(sqlctx, mbA, containsID, schema);
+
+ //dataframe - frame conversion
+ JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID);
+
+ //get output frame block
+ FrameBlock fbB = SparkExecutionContext.toFrameBlock(out,
+ UtilFunctions.nCopies(cols, ValueType.DOUBLE), rows1, cols);
+
+ //compare frame blocks
+ MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB);
+ double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+ TestUtils.compareMatrices(A, B, rows1, cols, eps);
+ }
+ catch( Exception ex ) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ sec.close();
+ DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+ DMLScript.rtplatform = oldPlatform;
+ }
+ }
+
+ /**
+ *
+ * @param sqlctx
+ * @param mb
+ * @param schema
+ * @return
+ * @throws DMLRuntimeException
+ */
+ @SuppressWarnings("resource")
+ private DataFrame createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema)
+ throws DMLRuntimeException
+ {
+ //create in-memory list of rows
+ List<Row> list = new ArrayList<Row>();
+ int off = (containsID ? 1 : 0);
+ int clen = mb.getNumColumns() + off - colsVector + 1;
+
+ for( int i=0; i<mb.getNumRows(); i++ ) {
+ Object[] row = new Object[clen];
+ if( containsID )
+ row[0] = i+1;
+ for( int j=0, j2=0; j<mb.getNumColumns(); j++, j2++ ) {
+ if( schema[j2] != ValueType.OBJECT ) {
+ row[j2+off] = UtilFunctions
+ .doubleToObject(schema[j2], mb.quickGetValue(i, j));
+ }
+ else {
+ double[] tmp = DataConverter.convertToDoubleVector(
+ mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()));
+ row[j2+off] = new DenseVector(tmp);
+ j += colsVector-1;
+ }
+ }
+ list.add(RowFactory.create(row));
+ }
+
+ //create data frame schema
+ List<StructField> fields = new ArrayList<StructField>();
+ if( containsID )
+ fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN,
+ DataTypes.DoubleType, true));
+ for( int j=0; j<schema.length; j++ ) {
+ DataType dt = null;
+ switch(schema[j]) {
+ case STRING: dt = DataTypes.StringType; break;
+ case DOUBLE: dt = DataTypes.DoubleType; break;
+ case INT: dt = DataTypes.LongType; break;
+ case OBJECT: dt = new VectorUDT(); break;
+ default: throw new RuntimeException("Unsupported value type.");
+ }
+ fields.add(DataTypes.createStructField("C"+(j+1), dt, true));
+ }
+ StructType dfSchema = DataTypes.createStructType(fields);
+
+ //create rdd and data frame
+ JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext());
+ JavaRDD<Row> rowRDD = sc.parallelize(list);
+ return sqlctx.createDataFrame(rowRDD, dfSchema);
+ }
+}
\ No newline at end of file