You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/16 17:37:34 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-914] Rework
dataframe-frame, frame-dataframe converters
[SYSTEMML-914] Rework dataframe-frame, frame-dataframe converters
Similar to the rework of dataframe-matrix converters, this patch fixes
various correctness and performance issues of the dataframe-frame
converters. This includes consistent rowID column handling, consistent
APIs, a fix of dimension analysis with existing ID column, exploitation
of existing row IDs, avoided unnecessary dimension analysis on unknown
nnzs, efficient schema handling, and a more efficient parsing of frame
inputs by exploiting matching value types.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/df090f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/df090f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/df090f2b
Branch: refs/heads/master
Commit: df090f2b1f9dd23a6fd48c5a67a95e1eb8e3ba59
Parents: ed07284
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 01:35:54 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 16 06:33:32 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/sysml/api/MLOutput.java | 19 +-
.../api/mlcontext/MLContextConversionUtil.java | 5 +-
.../spark/utils/FrameRDDConverterUtils.java | 218 +++++++++-------
.../spark/utils/RDDConverterUtils.java | 6 +-
.../sysml/runtime/util/UtilFunctions.java | 7 +-
.../functions/frame/FrameConverterTest.java | 13 +-
.../mlcontext/DataFrameConversionTest.java | 196 ---------------
.../mlcontext/DataFrameFrameConversionTest.java | 246 +++++++++++++++++++
.../DataFrameMatrixConversionTest.java | 196 +++++++++++++++
.../functions/mlcontext/FrameTest.java | 12 +-
.../mlcontext/MLContextFrameTest.java | 35 +--
11 files changed, 623 insertions(+), 330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index ec2f24d..d011104 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -47,18 +47,17 @@ public class MLOutput {
Map<String, JavaPairRDD<?,?>> _outputs;
private Map<String, MatrixCharacteristics> _outMetadata = null;
+ public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) {
+ this._outputs = outputs;
+ this._outMetadata = outMetadata;
+ }
+
public MatrixBlock getMatrixBlock(String varName) throws DMLRuntimeException {
MatrixCharacteristics mc = getMatrixCharacteristics(varName);
// The matrix block is always pushed to an RDD and then we do collect
// We can later avoid this by returning symbol table rather than "Map<String, JavaPairRDD<MatrixIndexes,MatrixBlock>> _outputs"
- MatrixBlock mb = SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(),
+ return SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(),
mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros());
- return mb;
- }
-
- public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) {
- this._outputs = outputs;
- this._outMetadata = outMetadata;
}
@SuppressWarnings("unchecked")
@@ -160,6 +159,8 @@ public class MLOutput {
}
public JavaRDD<String> getStringFrameRDD(String varName, String format, CSVFileFormatProperties fprop ) throws DMLRuntimeException {
+ //TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and
+ //hence we will never find a frame binary block in the outputs.
JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName);
MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
if(format.equals("csv")) {
@@ -175,9 +176,11 @@ public class MLOutput {
}
public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException {
+ //TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and
+ //hence we will never find a frame binary block in the outputs.
JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName);
MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
- return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryRDD, mcIn, jsc);
+ return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null);
}
public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 5476902..e74dc53 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -1217,11 +1217,10 @@ public class MLContextConversionUtil {
@SuppressWarnings("unchecked")
JavaPairRDD<Long, FrameBlock> binaryBlockFrame = (JavaPairRDD<Long, FrameBlock>) sparkExecutionContext
.getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo);
- MatrixCharacteristics matrixCharacteristics = frameObject.getMatrixCharacteristics();
+ MatrixCharacteristics mc = frameObject.getMatrixCharacteristics();
JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
-
- return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, matrixCharacteristics, jsc);
+ return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryBlockFrame, mc, frameObject.getSchema());
}
catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 211f814..faf8ba1 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -54,6 +55,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
import org.apache.sysml.runtime.instructions.spark.data.SerText;
import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -326,27 +328,29 @@ public class FrameRDDConverterUtils
* @throws DMLRuntimeException
*/
public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
- DataFrame df, MatrixCharacteristics mcOut, boolean containsID)
+ DataFrame df, MatrixCharacteristics mc, boolean containsID)
throws DMLRuntimeException
{
-
- if(containsID)
- df = df.drop(RDDConverterUtils.DF_ID_COLUMN);
-
//determine unknown dimensions if required
- if( !mcOut.dimsKnown(true) ) {
+ if( !mc.dimsKnown() ) { //nnz are irrelevant here
JavaRDD<Row> tmp = df.javaRDD();
long rlen = tmp.count();
- long clen = containsID ? (df.columns().length - 1) : df.columns().length;
- mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+ long clen = df.columns().length - (containsID?1:0);
+ mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
}
- JavaPairRDD<Row, Long> prepinput = df.javaRDD()
- .zipWithIndex(); //zip row index
-
+ JavaPairRDD<Row, Long> prepinput = containsID ?
+ df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
+ df.javaRDD().zipWithIndex(); //zip row index
+
+ //convert data frame to frame schema (prepare once)
+ List<String> colnames = new ArrayList<String>();
+ List<ValueType> fschema = new ArrayList<ValueType>();
+ convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
+
//convert rdd to binary block rdd
- JavaPairRDD<Long, FrameBlock> out = prepinput
- .mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut));
+ JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair(
+ new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID));
return out;
}
@@ -359,22 +363,27 @@ public class FrameRDDConverterUtils
* @param strict
* @return
*/
- public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc)
+ public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in,
+ MatrixCharacteristics mc, List<ValueType> schema)
{
- List<ValueType> schema = in.first()._2().getSchema();
+ if( !mc.colsKnown() )
+ throw new RuntimeException("Number of columns needed to convert binary block to data frame.");
- //convert binary block to rows rdd (from blocks/rows)
- JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction());
+ //convert binary block to rows rdd
+ JavaRDD<Row> rowRDD = in.flatMap(
+ new BinaryBlockToDataFrameFunction());
- SQLContext sqlContext = new SQLContext(sc);
- StructType dfSchema = convertFrameSchemaToDFSchema(schema);
- DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+ //create data frame schema
+ if( schema == null )
+ schema = Collections.nCopies((int)mc.getCols(), ValueType.STRING);
+ StructType dfSchema = convertFrameSchemaToDFSchema(schema, true);
- return df;
+ //rdd to data frame conversion
+ return sqlctx.createDataFrame(rowRDD, dfSchema);
}
- /*
+ /**
* This function will convert Frame schema into DataFrame schema
*
* @param schema
@@ -382,32 +391,64 @@ public class FrameRDDConverterUtils
* @return
* Returns the DataFrame schema (StructType)
*/
- public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
+ public static StructType convertFrameSchemaToDFSchema(List<ValueType> fschema, boolean containsID)
{
- // Generate the schema based on the string of schema
+ // generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
- int i = 1;
- for (ValueType schema : lschema) {
- org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
+ // add id column type
+ if( containsID )
+ fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN,
+ DataTypes.DoubleType, true));
+
+ // add remaining types
+ int col = 1;
+ for (ValueType schema : fschema) {
+ DataType dt = null;
switch(schema) {
- case STRING: dataType = DataTypes.StringType; break;
- case DOUBLE: dataType = DataTypes.DoubleType; break;
- case INT: dataType = DataTypes.LongType; break;
- case BOOLEAN: dataType = DataTypes.BooleanType; break;
- default:
+ case STRING: dt = DataTypes.StringType; break;
+ case DOUBLE: dt = DataTypes.DoubleType; break;
+ case INT: dt = DataTypes.LongType; break;
+ case BOOLEAN: dt = DataTypes.BooleanType; break;
+ default: dt = DataTypes.StringType;
LOG.warn("Using default type String for " + schema.toString());
}
- fields.add(DataTypes.createStructField("C"+i++, dataType, true));
+ fields.add(DataTypes.createStructField("C"+col++, dt, true));
}
return DataTypes.createStructType(fields);
}
+ /**
+ *
+ * @param dfschema
+ * @param containsID
+ * @return
+ */
+ public static void convertDFSchemaToFrameSchema(StructType dfschema, List<String> colnames,
+ List<ValueType> fschema, boolean containsID)
+ {
+ int off = containsID ? 1 : 0;
+ for( int i=off; i<dfschema.fields().length; i++ ) {
+ StructField structType = dfschema.apply(i);
+ colnames.add(structType.name());
+ if(structType.dataType() == DataTypes.DoubleType
+ || structType.dataType() == DataTypes.FloatType)
+ fschema.add(ValueType.DOUBLE);
+ else if(structType.dataType() == DataTypes.LongType
+ || structType.dataType() == DataTypes.IntegerType)
+ fschema.add(ValueType.INT);
+ else if(structType.dataType() == DataTypes.BooleanType)
+ fschema.add(ValueType.BOOLEAN);
+ else
+ fschema.add(ValueType.STRING);
+ }
+ }
+
/*
* It will return JavaRDD<Row> based on csv data input file.
*/
- public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema)
+ public static JavaRDD<Row> csvToRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema)
{
// Load a text file and convert each line to a java rdd.
JavaRDD<String> dataRdd = sc.textFile(fnameIn);
@@ -695,20 +736,29 @@ public class FrameRDDConverterUtils
private static final long serialVersionUID = 2269315691094111843L;
private long _clen = -1;
+ private List<String> _colnames = null;
+ private List<ValueType> _schema = null;
+ private boolean _containsID = false;
private int _maxRowsPerBlock = -1;
- public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) {
+ public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, List<String> colnames,
+ List<ValueType> schema, boolean containsID) {
_clen = mc.getCols();
+ _colnames = colnames;
+ _schema = schema;
+ _containsID = containsID;
_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
}
@Override
- public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+ public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0)
+ throws Exception
+ {
ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
- Long[] ix = new Long[1];
- FrameBlock[] mb = new FrameBlock[1];
- int iRowsInBlock = 0;
+ long ix = -1;
+ FrameBlock fb = null;
+ Object[] tmprow = new Object[(int)_clen];
while( arg0.hasNext() )
{
@@ -716,55 +766,40 @@ public class FrameRDDConverterUtils
Row row = tmp._1();
long rowix = tmp._2()+1;
- if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
- if( iRowsInBlock == _maxRowsPerBlock )
- flushBlocksToList(ix, mb, ret);
- createBlocks(rowix, ix, mb, row);
- iRowsInBlock = 0;
+ if( fb == null || fb.getNumRows() == _maxRowsPerBlock) {
+ if( fb != null )
+ flushBlocksToList(ix, fb, ret);
+ ix = rowix;
+ fb = new FrameBlock(_schema, _colnames);
}
//process row data
- Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema());
- mb[0].appendRow(parts);
- iRowsInBlock++;
+ int off = _containsID ? 1 : 0;
+ for(int i=off; i<row.size(); i++) {
+ tmprow[i-off] = UtilFunctions.objectToObject(
+ _schema.get(i-off), row.get(i));
+ }
+ fb.appendRow(tmprow);
}
//flush last blocks
- flushBlocksToList(ix, mb, ret);
+ flushBlocksToList(ix, fb, ret);
return ret;
}
- public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception {
- Object[] ret = new Object[_clen];
- for(int i = 0; i < row.length(); i++)
- ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i));
- for(int i=row.length(); i<_clen; i++)
- ret[i] = "";
- return ret;
- }
-
- // Creates new state of empty column blocks for current global row index.
- private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row)
- {
- //compute row block index and number of column blocks
- ix[0] = new Long(rowix);
-
- List<String> columns = new ArrayList<String>();
- List<ValueType> schema = new ArrayList<ValueType>();
- for (StructField structType: row.schema().fields()) {
- columns.add(structType.name());
- if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
- schema.add(ValueType.DOUBLE);
- else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
- schema.add(ValueType.INT);
- else if(structType.dataType() == DataTypes.BooleanType)
- schema.add(ValueType.BOOLEAN);
- else
- schema.add(ValueType.STRING);
- }
- mb[0] = new FrameBlock(schema);
- mb[0].setColumnNames(columns);
+ /**
+ *
+ * @param ix
+ * @param fb
+ * @param ret
+ * @throws DMLRuntimeException
+ */
+ private static void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret )
+ throws DMLRuntimeException
+ {
+ if( fb != null && fb.getNumRows()>0 )
+ ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
}
}
@@ -779,14 +814,21 @@ public class FrameRDDConverterUtils
public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
throws Exception
{
+ long rowIndex = arg0._1();
FrameBlock blk = arg0._2();
ArrayList<Row> ret = new ArrayList<Row>();
//handle Frame block data
- Iterator<Object[]> iter = blk.getObjectRowIterator();
- while( iter.hasNext() )
- ret.add(RowFactory.create(iter.next().clone()));
-
+ int rows = blk.getNumRows();
+ int cols = blk.getNumColumns();
+ for( int i=0; i<rows; i++ ) {
+ Object[] row = new Object[cols+1];
+ row[0] = rowIndex++;
+ for( int j=0; j<cols; j++ )
+ row[j+1] = blk.get(i, j);
+ ret.add(RowFactory.create(row));
+ }
+
return ret;
}
}
@@ -1046,13 +1088,11 @@ public class FrameRDDConverterUtils
// Common functions
// Flushes current state of filled column blocks to output list.
- private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret )
+ private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret )
throws DMLRuntimeException
- {
- int len = ix.length;
- for( int i=0; i<len; i++ )
- if( mb[i] != null ) {
- ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i]));
- }
+ {
+ for( int i=0; i<ix.length; i++ )
+ if( fb[i] != null && fb[0].getNumRows()>0 )
+ ret.add(new Tuple2<Long,FrameBlock>(ix[i], fb[i]));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index ba1934a..3ee1ef8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -322,7 +322,7 @@ public class RDDConverterUtils
* @return
* @throws DMLRuntimeException
*/
- public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext,
+ public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx,
JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)
{
if( !mc.colsKnown() )
@@ -344,7 +344,7 @@ public class RDDConverterUtils
}
//rdd to data frame conversion
- return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
+ return sqlctx.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
}
/**
@@ -1011,7 +1011,7 @@ public class RDDConverterUtils
/**
*
*/
- private static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long>
+ protected static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long>
{
private static final long serialVersionUID = 7438855241666363433L;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 1ac552f..81a1e8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -423,8 +423,11 @@ public class UtilFunctions
* @return
*/
public static Object objectToObject(ValueType vt, Object in ) {
- String str = objectToString(in);
- return stringToObject(vt, str );
+ if( in instanceof Double && vt == ValueType.DOUBLE
+ || in instanceof Long && vt == ValueType.INT )
+ return in; //quick path to avoid double parsing
+ else
+ return stringToObject(vt, objectToString(in) );
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 107dee3..f0c17eb 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase
//Create DataFrame
SQLContext sqlContext = new SQLContext(sc);
- StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema);
- JavaRDD<Row> rowRDD = FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema);
+ StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
+ JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, schema);
DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
@@ -532,13 +532,14 @@ public class FrameConverterTest extends AutomatedTestBase
case BIN2DFRM: {
InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
- JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
- JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
- DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+ JavaPairRDD<Long, FrameBlock> rddIn = sc
+ .hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
+ .mapToPair(new LongWritableFrameToLongFrameFunction());
+ DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, schema);
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+ .dataFrameToBinaryBlock(sc, df, mc, true)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
deleted file mode 100644
index c19865c..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.mlcontext;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-
-public class DataFrameConversionTest extends AutomatedTestBase
-{
- private final static String TEST_DIR = "functions/mlcontext/";
- private final static String TEST_NAME = "DataFrameConversion";
- private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameConversionTest.class.getSimpleName() + "/";
-
- private final static int rows1 = 2245;
- private final static int cols1 = 745;
- private final static int cols2 = 1264;
- private final static double sparsity1 = 0.9;
- private final static double sparsity2 = 0.1;
- private final static double eps=0.0000000001;
-
-
- @Override
- public void setUp() {
- addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
- }
-
- @Test
- public void testVectorConversionSingleDense() {
- testDataFrameConversion(true, true, true, false);
- }
-
- @Test
- public void testVectorConversionSingleDenseUnknown() {
- testDataFrameConversion(true, true, true, true);
- }
-
- @Test
- public void testVectorConversionSingleSparse() {
- testDataFrameConversion(true, true, false, false);
- }
-
- @Test
- public void testVectorConversionSingleSparseUnknown() {
- testDataFrameConversion(true, true, false, true);
- }
-
- @Test
- public void testVectorConversionMultiDense() {
- testDataFrameConversion(true, false, true, false);
- }
-
- @Test
- public void testVectorConversionMultiDenseUnknown() {
- testDataFrameConversion(true, false, true, true);
- }
-
- @Test
- public void testVectorConversionMultiSparse() {
- testDataFrameConversion(true, false, false, false);
- }
-
- @Test
- public void testVectorConversionMultiSparseUnknown() {
- testDataFrameConversion(true, false, false, true);
- }
-
- @Test
- public void testRowConversionSingleDense() {
- testDataFrameConversion(false, true, true, false);
- }
-
- @Test
- public void testRowConversionSingleDenseUnknown() {
- testDataFrameConversion(false, true, true, true);
- }
-
- @Test
- public void testRowConversionSingleSparse() {
- testDataFrameConversion(false, true, false, false);
- }
-
- @Test
- public void testRowConversionSingleSparseUnknown() {
- testDataFrameConversion(false, true, false, true);
- }
-
- @Test
- public void testRowConversionMultiDense() {
- testDataFrameConversion(false, false, true, false);
- }
-
- @Test
- public void testRowConversionMultiDenseUnknown() {
- testDataFrameConversion(false, false, true, true);
- }
-
- @Test
- public void testRowConversionMultiSparse() {
- testDataFrameConversion(false, false, false, false);
- }
-
- @Test
- public void testRowConversionMultiSparseUnknown() {
- testDataFrameConversion(false, false, false, true);
- }
-
- /**
- *
- * @param vector
- * @param singleColBlock
- * @param dense
- * @param unknownDims
- */
- private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
- boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
- RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
-
- SparkExecutionContext sec = null;
-
- try
- {
- DMLScript.USE_LOCAL_SPARK_CONFIG = true;
- DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-
- //generate input data and setup metadata
- int cols = singleColBlock ? cols1 : cols2;
- double sparsity = dense ? sparsity1 : sparsity2;
- double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
- MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
- MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
-
- //setup spark context
- sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
- JavaSparkContext sc = sec.getSparkContext();
- SQLContext sqlctx = new SQLContext(sc);
-
- //get binary block input rdd
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
-
- //matrix - dataframe - matrix conversion
- DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
-
- //get output matrix block
- MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
-
- //compare matrix blocks
- double[][] B = DataConverter.convertToDoubleMatrix(mbB);
- TestUtils.compareMatrices(A, B, rows1, cols, eps);
- }
- catch( Exception ex ) {
- throw new RuntimeException(ex);
- }
- finally {
- sec.close();
- DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
- DMLScript.rtplatform = oldPlatform;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
new file mode 100644
index 0000000..a26cfe8
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameFrameConversionTest extends AutomatedTestBase
+{
+ private final static String TEST_DIR = "functions/mlcontext/";
+ private final static String TEST_NAME = "DataFrameConversion";
+ private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameFrameConversionTest.class.getSimpleName() + "/";
+
+ private final static int rows1 = 2245;
+ private final static int cols1 = 745;
+ private final static int cols2 = 1264;
+ private final static double sparsity1 = 0.9;
+ private final static double sparsity2 = 0.1;
+ private final static double eps=0.0000000001;
+
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+ }
+
+
+
+ @Test
+ public void testRowDoubleConversionSingleDense() {
+ testDataFrameConversion(ValueType.DOUBLE, true, true, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, true, true, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleSparse() {
+ testDataFrameConversion(ValueType.DOUBLE, true, false, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, true, false, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiDense() {
+ testDataFrameConversion(ValueType.DOUBLE, false, true, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, false, true, true);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiSparse() {
+ testDataFrameConversion(ValueType.DOUBLE, false, false, false);
+ }
+
+ @Test
+ public void testRowDoubleConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.DOUBLE, false, false, true);
+ }
+
+ @Test
+ public void testRowStringConversionSingleDense() {
+ testDataFrameConversion(ValueType.STRING, true, true, false);
+ }
+
+ @Test
+ public void testRowStringConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.STRING, true, true, true);
+ }
+
+ @Test
+ public void testRowStringConversionSingleSparse() {
+ testDataFrameConversion(ValueType.STRING, true, false, false);
+ }
+
+ @Test
+ public void testRowStringConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.STRING, true, false, true);
+ }
+
+ @Test
+ public void testRowStringConversionMultiDense() {
+ testDataFrameConversion(ValueType.STRING, false, true, false);
+ }
+
+ @Test
+ public void testRowStringConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.STRING, false, true, true);
+ }
+
+ @Test
+ public void testRowStringConversionMultiSparse() {
+ testDataFrameConversion(ValueType.STRING, false, false, false);
+ }
+
+ @Test
+ public void testRowStringConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.STRING, false, false, true);
+ }
+
+ @Test
+ public void testRowLongConversionSingleDense() {
+ testDataFrameConversion(ValueType.INT, true, true, false);
+ }
+
+ @Test
+ public void testRowLongConversionSingleDenseUnknown() {
+ testDataFrameConversion(ValueType.INT, true, true, true);
+ }
+
+ @Test
+ public void testRowLongConversionSingleSparse() {
+ testDataFrameConversion(ValueType.INT, true, false, false);
+ }
+
+ @Test
+ public void testRowLongConversionSingleSparseUnknown() {
+ testDataFrameConversion(ValueType.INT, true, false, true);
+ }
+
+ @Test
+ public void testRowLongConversionMultiDense() {
+ testDataFrameConversion(ValueType.INT, false, true, false);
+ }
+
+ @Test
+ public void testRowLongConversionMultiDenseUnknown() {
+ testDataFrameConversion(ValueType.INT, false, true, true);
+ }
+
+ @Test
+ public void testRowLongConversionMultiSparse() {
+ testDataFrameConversion(ValueType.INT, false, false, false);
+ }
+
+ @Test
+ public void testRowLongConversionMultiSparseUnknown() {
+ testDataFrameConversion(ValueType.INT, false, false, true);
+ }
+
+ /**
+ *
+ * @param vector
+ * @param singleColBlock
+ * @param dense
+ * @param unknownDims
+ */
+ private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) {
+ boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+ SparkExecutionContext sec = null;
+
+ try
+ {
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+
+ //generate input data and setup metadata
+ int cols = singleColBlock ? cols1 : cols2;
+ double sparsity = dense ? sparsity1 : sparsity2;
+ double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
+ A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
+ MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
+ FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt);
+ int blksz = ConfigurationManager.getBlocksize();
+ MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+ MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+ List<ValueType> schema = Collections.nCopies(cols, vt);
+
+ //setup spark context
+ sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
+ JavaSparkContext sc = sec.getSparkContext();
+ SQLContext sqlctx = new SQLContext(sc);
+
+ //get binary block input rdd
+ JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
+
+ //frame - dataframe - frame conversion
+ DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+ JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
+
+ //get output frame block
+ FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
+
+ //compare frame blocks
+ MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB);
+ double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+ TestUtils.compareMatrices(A, B, rows1, cols, eps);
+ }
+ catch( Exception ex ) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ sec.close();
+ DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+ DMLScript.rtplatform = oldPlatform;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
new file mode 100644
index 0000000..e88a867
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameMatrixConversionTest extends AutomatedTestBase
+{
+ private final static String TEST_DIR = "functions/mlcontext/";
+ private final static String TEST_NAME = "DataFrameConversion";
+ private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/";
+
+ private final static int rows1 = 2245;
+ private final static int cols1 = 745;
+ private final static int cols2 = 1264;
+ private final static double sparsity1 = 0.9;
+ private final static double sparsity2 = 0.1;
+ private final static double eps=0.0000000001;
+
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+ }
+
+ @Test
+ public void testVectorConversionSingleDense() {
+ testDataFrameConversion(true, true, true, false);
+ }
+
+ @Test
+ public void testVectorConversionSingleDenseUnknown() {
+ testDataFrameConversion(true, true, true, true);
+ }
+
+ @Test
+ public void testVectorConversionSingleSparse() {
+ testDataFrameConversion(true, true, false, false);
+ }
+
+ @Test
+ public void testVectorConversionSingleSparseUnknown() {
+ testDataFrameConversion(true, true, false, true);
+ }
+
+ @Test
+ public void testVectorConversionMultiDense() {
+ testDataFrameConversion(true, false, true, false);
+ }
+
+ @Test
+ public void testVectorConversionMultiDenseUnknown() {
+ testDataFrameConversion(true, false, true, true);
+ }
+
+ @Test
+ public void testVectorConversionMultiSparse() {
+ testDataFrameConversion(true, false, false, false);
+ }
+
+ @Test
+ public void testVectorConversionMultiSparseUnknown() {
+ testDataFrameConversion(true, false, false, true);
+ }
+
+ @Test
+ public void testRowConversionSingleDense() {
+ testDataFrameConversion(false, true, true, false);
+ }
+
+ @Test
+ public void testRowConversionSingleDenseUnknown() {
+ testDataFrameConversion(false, true, true, true);
+ }
+
+ @Test
+ public void testRowConversionSingleSparse() {
+ testDataFrameConversion(false, true, false, false);
+ }
+
+ @Test
+ public void testRowConversionSingleSparseUnknown() {
+ testDataFrameConversion(false, true, false, true);
+ }
+
+ @Test
+ public void testRowConversionMultiDense() {
+ testDataFrameConversion(false, false, true, false);
+ }
+
+ @Test
+ public void testRowConversionMultiDenseUnknown() {
+ testDataFrameConversion(false, false, true, true);
+ }
+
+ @Test
+ public void testRowConversionMultiSparse() {
+ testDataFrameConversion(false, false, false, false);
+ }
+
+ @Test
+ public void testRowConversionMultiSparseUnknown() {
+ testDataFrameConversion(false, false, false, true);
+ }
+
+ /**
+ *
+ * @param vector
+ * @param singleColBlock
+ * @param dense
+ * @param unknownDims
+ */
+ private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
+ boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+ SparkExecutionContext sec = null;
+
+ try
+ {
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+
+ //generate input data and setup metadata
+ int cols = singleColBlock ? cols1 : cols2;
+ double sparsity = dense ? sparsity1 : sparsity2;
+ double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
+ MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
+ int blksz = ConfigurationManager.getBlocksize();
+ MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+ MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+
+ //setup spark context
+ sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
+ JavaSparkContext sc = sec.getSparkContext();
+ SQLContext sqlctx = new SQLContext(sc);
+
+ //get binary block input rdd
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+
+ //matrix - dataframe - matrix conversion
+ DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+
+ //get output matrix block
+ MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+
+ //compare matrix blocks
+ double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+ TestUtils.compareMatrices(A, B, rows1, cols, eps);
+ }
+ catch( Exception ex ) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ sec.close();
+ DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+ DMLScript.rtplatform = oldPlatform;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index d12f6f2..11f3f02 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -226,13 +226,13 @@ public class FrameTest extends AutomatedTestBase
{
//Create DataFrame for input A
SQLContext sqlContext = new SQLContext(sc);
- StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema);
- JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
+ StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
+ JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
//Create DataFrame for input B
- StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
- JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
+ StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
+ JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
}
@@ -285,7 +285,7 @@ public class FrameTest extends AutomatedTestBase
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, -1, -1, -1);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .dataFrameToBinaryBlock(jsc, df, mc, false)
+ .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(output("AB"), LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
}
@@ -306,7 +306,7 @@ public class FrameTest extends AutomatedTestBase
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
MatrixCharacteristics mc = new MatrixCharacteristics(cRows, cCols, -1, -1, -1);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .dataFrameToBinaryBlock(jsc, df, mc, false)
+ .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fName, LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
index 98c8b10..deac382 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
@@ -46,6 +46,7 @@ import org.apache.sysml.api.mlcontext.MatrixMetadata;
import org.apache.sysml.api.mlcontext.Script;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToRow;
import org.junit.After;
@@ -230,9 +231,9 @@ public class MLContextFrameTest extends AutomatedTestBase {
// Create DataFrame
SQLContext sqlContext = new SQLContext(sc);
- StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA);
+ StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA, false);
DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, dfSchemaA);
- StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
+ StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
DataFrame dataFrameB = sqlContext.createDataFrame(javaRddRowB, dfSchemaB);
if (script_type == SCRIPT_TYPE.DML)
script = dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, fmB).out("A")
@@ -368,31 +369,31 @@ public class MLContextFrameTest extends AutomatedTestBase {
} else if (outputType == IO_TYPE.DATAFRAME) {
- DataFrame dataFrameA = mlResults.getDataFrame("A");
+ DataFrame dataFrameA = mlResults.getDataFrame("A").drop(RDDConverterUtils.DF_ID_COLUMN);
List<Row> listAOut = dataFrameA.collectAsList();
Row row1 = listAOut.get(0);
- Assert.assertEquals("Mistmatch with expected value", "1", row1.getString(0));
- Assert.assertEquals("Mistmatch with expected value", "Str2", row1.getString(1));
- Assert.assertEquals("Mistmatch with expected value", "3.0", row1.getString(2));
- Assert.assertEquals("Mistmatch with expected value", "true", row1.getString(3));
-
+ Assert.assertEquals("Mistmatch with expected value", "1", row1.get(0).toString());
+ Assert.assertEquals("Mistmatch with expected value", "Str2", row1.get(1).toString());
+ Assert.assertEquals("Mistmatch with expected value", "3.0", row1.get(2).toString());
+ Assert.assertEquals("Mistmatch with expected value", "true", row1.get(3).toString());
+
Row row2 = listAOut.get(1);
- Assert.assertEquals("Mistmatch with expected value", "4", row2.getString(0));
- Assert.assertEquals("Mistmatch with expected value", "Str12", row2.getString(1));
- Assert.assertEquals("Mistmatch with expected value", "13.0", row2.getString(2));
- Assert.assertEquals("Mistmatch with expected value", "true", row2.getString(3));
+ Assert.assertEquals("Mistmatch with expected value", "4", row2.get(0).toString());
+ Assert.assertEquals("Mistmatch with expected value", "Str12", row2.get(1).toString());
+ Assert.assertEquals("Mistmatch with expected value", "13.0", row2.get(2).toString());
+ Assert.assertEquals("Mistmatch with expected value", "true", row2.get(3).toString());
- DataFrame dataFrameC = mlResults.getDataFrame("C");
+ DataFrame dataFrameC = mlResults.getDataFrame("C").drop(RDDConverterUtils.DF_ID_COLUMN);
List<Row> listCOut = dataFrameC.collectAsList();
Row row3 = listCOut.get(0);
- Assert.assertEquals("Mistmatch with expected value", "Str12", row3.getString(0));
- Assert.assertEquals("Mistmatch with expected value", "13.0", row3.getString(1));
+ Assert.assertEquals("Mistmatch with expected value", "Str12", row3.get(0).toString());
+ Assert.assertEquals("Mistmatch with expected value", "13.0", row3.get(1).toString());
Row row4 = listCOut.get(1);
- Assert.assertEquals("Mistmatch with expected value", "Str25", row4.getString(0));
- Assert.assertEquals("Mistmatch with expected value", "26.0", row4.getString(1));
+ Assert.assertEquals("Mistmatch with expected value", "Str25", row4.get(0));
+ Assert.assertEquals("Mistmatch with expected value", "26.0", row4.get(1));
} else {
String[][] frameA = mlResults.getFrameAs2DStringArray("A");
Assert.assertEquals("Str2", frameA[0][1]);