You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/07/23 05:35:05 UTC
incubator-systemml git commit: [SYSTEMML-560] Frame DataFrame
converters
Repository: incubator-systemml
Updated Branches:
refs/heads/master 821c5f50d -> 14e9f6443
[SYSTEMML-560] Frame DataFrame converters
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/14e9f644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/14e9f644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/14e9f644
Branch: refs/heads/master
Commit: 14e9f64439ce58acd3b3cdaf53e75a768a4757f0
Parents: 821c5f5
Author: Arvind Surve <ac...@yahoo.com>
Authored: Fri Jul 22 22:34:27 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Fri Jul 22 22:34:27 2016 -0700
----------------------------------------------------------------------
.../spark/utils/FrameRDDConverterUtils.java | 214 +++++++++++++++++--
.../apache/sysml/runtime/io/FrameReader.java | 2 +-
.../runtime/matrix/data/LibMatrixBincell.java | 2 +-
.../runtime/matrix/data/LibMatrixOuterAgg.java | 26 +--
.../sysml/runtime/matrix/data/MatrixBlock.java | 4 +-
.../matrix/sort/SamplingSortMRInputFormat.java | 4 +-
.../sysml/runtime/util/DataConverter.java | 4 +-
.../sysml/runtime/util/UtilFunctions.java | 44 ++++
.../functions/frame/FrameAppendDistTest.java | 2 +-
.../functions/frame/FrameConverterTest.java | 89 +++++++-
.../functions/frame/FrameCopyTest.java | 2 +-
.../functions/frame/FrameIndexingDistTest.java | 2 +-
.../functions/frame/FrameReadWriteTest.java | 2 +-
13 files changed, 349 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 052d3f3..c243eeb 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -36,6 +36,13 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
@@ -101,8 +108,7 @@ public class FrameRDDConverterUtils
//convert csv rdd to binary block rdd (w/ partial blocks)
JavaPairRDD<Long, FrameBlock> out = prepinput
- .mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill))
- .mapToPair(new LongWritableFrameToLongFrameFunction());
+ .mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill));
return out;
}
@@ -300,8 +306,70 @@ public class FrameRDDConverterUtils
return out;
}
+ //=====================================
+ // DataFrame <--> Binary block
+ /**
+ *
+ * @param sc
+ * @param input
+ * @param mcOut
+ * @param hasHeader
+ * @param delim
+ * @param fill
+ * @param missingValue
+ * @return
+ * @throws DMLRuntimeException
+ */
+ public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
+ DataFrame df, MatrixCharacteristics mcOut, boolean containsID)
+ throws DMLRuntimeException
+ {
+
+ if(containsID)
+ df = df.drop("ID");
+
+ //determine unknown dimensions if required
+ if( !mcOut.dimsKnown(true) ) {
+ JavaRDD<Row> tmp = df.javaRDD();
+ long rlen = tmp.count();
+ long clen = containsID ? (df.columns().length - 1) : df.columns().length;
+ mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+ }
+
+ JavaPairRDD<Row, Long> prepinput = df.javaRDD()
+ .zipWithIndex(); //zip row index
+
+ //convert rdd to binary block rdd
+ JavaPairRDD<Long, FrameBlock> out = prepinput
+ .mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut));
+
+ return out;
+ }
+
+ /**
+ *
+ * @param in
+ * @param mcIn
+ * @param props
+ * @param strict
+ * @return
+ */
+ public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc)
+ {
+ List<ValueType> schema = in.first()._2().getSchema();
+
+ //convert binary block to rows rdd (from blocks/rows)
+ JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction());
+
+ SQLContext sqlContext = new SQLContext(sc);
+ StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
+ DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+
+ return df;
+ }
+
/////////////////////////////////
// CSV-SPECIFIC FUNCTIONS
@@ -391,7 +459,7 @@ public class FrameRDDConverterUtils
* In terms of memory consumption this is better than creating partial blocks of row segments.
*
*/
- private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,LongWritable,FrameBlock>
+ private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,Long,FrameBlock>
{
private static final long serialVersionUID = -1976803898174960086L;
@@ -413,12 +481,12 @@ public class FrameRDDConverterUtils
}
@Override
- public Iterable<Tuple2<LongWritable, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0)
+ public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0)
throws Exception
{
- ArrayList<Tuple2<LongWritable,FrameBlock>> ret = new ArrayList<Tuple2<LongWritable,FrameBlock>>();
+ ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
- LongWritable[] ix = new LongWritable[1];
+ Long[] ix = new Long[1];
FrameBlock[] mb = new FrameBlock[1];
int iRowsInBlock = 0;
@@ -467,10 +535,10 @@ public class FrameRDDConverterUtils
}
// Creates new state of empty column blocks for current global row index.
- private void createBlocks(long rowix, LongWritable[] ix, FrameBlock[] mb)
+ private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb)
{
//compute row block index and number of column blocks
- ix[0] = new LongWritable(rowix);
+ ix[0] = rowix;
mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
if( _colnames != null )
mb[0].setColumnNames(_colnames);
@@ -481,17 +549,6 @@ public class FrameRDDConverterUtils
for( int j=0; j<_clen; j++ )
mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
}
-
- // Flushes current state of filled column blocks to output list.
- private void flushBlocksToList( LongWritable[] ix, FrameBlock[] mb, ArrayList<Tuple2<LongWritable,FrameBlock>> ret )
- throws DMLRuntimeException
- {
- int len = ix.length;
- for( int i=0; i<len; i++ )
- if( mb[i] != null ) {
- ret.add(new Tuple2<LongWritable,FrameBlock>(ix[i],mb[i]));
- }
- }
}
/**
@@ -558,6 +615,111 @@ public class FrameRDDConverterUtils
return ret;
}
}
+
+ /////////////////////////////////
+ // DataFrame-SPECIFIC FUNCTIONS
+
+ private static class DataFrameToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,Long,FrameBlock>
+ {
+ private static final long serialVersionUID = 2269315691094111843L;
+
+ private long _clen = -1;
+ private int _maxRowsPerBlock = -1;
+
+ public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) {
+ _clen = mc.getCols();
+ _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
+ }
+
+ @Override
+ public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+ ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
+
+ Long[] ix = new Long[1];
+ FrameBlock[] mb = new FrameBlock[1];
+ int iRowsInBlock = 0;
+
+ while( arg0.hasNext() )
+ {
+ Tuple2<Row,Long> tmp = arg0.next();
+ Row row = tmp._1();
+ long rowix = tmp._2()+1;
+
+ if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
+ if( iRowsInBlock == _maxRowsPerBlock )
+ flushBlocksToList(ix, mb, ret);
+ createBlocks(rowix, ix, mb, row);
+ iRowsInBlock = 0;
+ }
+
+ //process row data
+ Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema());
+ mb[0].appendRow(parts);
+ iRowsInBlock++;
+ }
+
+ //flush last blocks
+ flushBlocksToList(ix, mb, ret);
+
+ return ret;
+ }
+
+ public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception {
+ Object[] ret = new Object[_clen];
+ for(int i = 0; i < row.length(); i++)
+ ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i));
+ for(int i=row.length(); i<_clen; i++)
+ ret[i] = "";
+ return ret;
+ }
+
+ // Creates new state of empty column blocks for current global row index.
+ private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row)
+ {
+ //compute row block index and number of column blocks
+ ix[0] = new Long(rowix);
+
+ List<String> columns = new ArrayList<String>();
+ List<ValueType> schema = new ArrayList<ValueType>();
+ for (StructField structType: row.schema().fields()) {
+ columns.add(structType.name());
+ if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
+ schema.add(ValueType.DOUBLE);
+ else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
+ schema.add(ValueType.INT);
+ else if(structType.dataType() == DataTypes.BooleanType)
+ schema.add(ValueType.BOOLEAN);
+ else
+ schema.add(ValueType.STRING);
+ }
+ mb[0] = new FrameBlock(schema);
+ mb[0].setColumnNames(columns);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class BinaryBlockToDataFrameFunction implements FlatMapFunction<Tuple2<Long,FrameBlock>,Row>
+ {
+ private static final long serialVersionUID = 8093340778966667460L;
+
+ @Override
+ public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
+ throws Exception
+ {
+ FrameBlock blk = arg0._2();
+ ArrayList<Row> ret = new ArrayList<Row>();
+
+ //handle Frame block data
+ Iterator<Object[]> iter = blk.getObjectRowIterator();
+ while( iter.hasNext() )
+ ret.add(RowFactory.create(iter.next().clone()));
+
+ return ret;
+ }
+ }
+
/////////////////////////////////
// TEXTCELL-SPECIFIC FUNCTIONS
@@ -808,4 +970,18 @@ public class FrameRDDConverterUtils
return ret;
}
}
+
+ //////////////////////////////////////
+ // Common functions
+
+ // Flushes current state of filled column blocks to output list.
+ private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret )
+ throws DMLRuntimeException
+ {
+ int len = ix.length;
+ for( int i=0; i<len; i++ )
+ if( mb[i] != null ) {
+ ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i]));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index d37bbde..e318fff 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -104,7 +104,7 @@ public abstract class FrameReader
throws IOException, DMLRuntimeException
{
List<String> colNames = new ArrayList<String>();
- for (int i=0; i < clen; ++i)
+ for (int i=0; i < clen; i++)
colNames.add("C"+i);
return colNames;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
index dd0b9e0..6ad08d8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
@@ -781,7 +781,7 @@ public class LibMatrixBincell
if( ixPos1 >= 0 ){ //match, scan to next val
if(bOp.fn instanceof LessThan || bOp.fn instanceof GreaterThanEquals
|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
- while( ixPos1<bv.length && value==bv[ixPos1] ) ++ixPos1;
+ while( ixPos1<bv.length && value==bv[ixPos1] ) ixPos1++;
if(bOp.fn instanceof GreaterThan || bOp.fn instanceof LessThanEquals
|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
while( ixPos2 > 0 && value==bv[ixPos2-1]) --ixPos2;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
index dab24a0..25d3067 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
@@ -184,10 +184,10 @@ public class LibMatrixOuterAgg
double dvix[] = new double[vix.length];
if (bPrimeCumSum)
- for (int i = 0; i< vix.length; ++i)
+ for (int i = 0; i< vix.length; i++)
dvix[vix.length-1-i] = vix[i];
else
- for (int i = 0; i< vix.length; ++i)
+ for (int i = 0; i< vix.length; i++)
dvix[i] = vix[i];
MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true);
@@ -197,7 +197,7 @@ public class LibMatrixOuterAgg
vixCumSum = DataConverter.convertToIntVector(mbResult);
if (bPrimeCumSum)
- for (int i = 0; i< (vixCumSum.length+1)/2; ++i) {
+ for (int i = 0; i< (vixCumSum.length+1)/2; i++) {
int iTemp = vixCumSum[vixCumSum.length-1-i];
vixCumSum[vixCumSum.length-1-i] = vixCumSum[i];
vixCumSum[i] = iTemp;
@@ -264,10 +264,10 @@ public class LibMatrixOuterAgg
double dvix[] = new double[vix.length];
if (bPrimeCumSum)
- for (int i = 0; i< vix.length; ++i)
+ for (int i = 0; i< vix.length; i++)
dvix[vix.length-1-i] = vix[i];
else
- for (int i = 0; i< vix.length; ++i)
+ for (int i = 0; i< vix.length; i++)
dvix[i] = vix[i];
MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true);
@@ -277,7 +277,7 @@ public class LibMatrixOuterAgg
vixCumSum = DataConverter.convertToIntVector(mbResult);
if (bPrimeCumSum)
- for (int i = 0; i< (vixCumSum.length+1)/2; ++i) {
+ for (int i = 0; i< (vixCumSum.length+1)/2; i++) {
int iTemp = vixCumSum[vixCumSum.length-1-i];
vixCumSum[vixCumSum.length-1-i] = vixCumSum[i];
vixCumSum[i] = iTemp;
@@ -1030,7 +1030,7 @@ public class LibMatrixOuterAgg
int[] aix = sblock.indexes(j);
double [] avals = sblock.values(j);
- for (int i=apos; i < apos+alen; ++i) {
+ for (int i=apos; i < apos+alen; i++) {
int cnt = sumEqNe(avals[i], bv, bOp);
out.quickSetValue(0, aix[i], cnt);
}
@@ -1447,7 +1447,7 @@ public class LibMatrixOuterAgg
} else if(bOp.fn instanceof Equals) {
double dFirstValue = vmb[0];
int i=0;
- while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+ while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
if (i < vmb.length-1)
vix[0] = i+1;
else
@@ -1455,7 +1455,7 @@ public class LibMatrixOuterAgg
} else if(bOp.fn instanceof NotEquals) {
double dFirstValue = vmb[0];
int i=0;
- while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+ while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
if (i < vmb.length-1)
vix[0] = i-1;
else
@@ -1520,10 +1520,10 @@ public class LibMatrixOuterAgg
{
int iCurInd = 0;
- for (int i = 0; i < vix.length;++i)
+ for (int i = 0; i < vix.length;i++)
{
double dPrevVal = vmb[iCurInd];
- while(i<vix.length && dPrevVal == vmb[i]) ++i;
+ while(i<vix.length && dPrevVal == vmb[i]) i++;
if(i < vix.length) {
for(int j=iCurInd; j<i; ++j) vix[j] = vix[i];
@@ -1555,9 +1555,9 @@ public class LibMatrixOuterAgg
int iLastIndex = 0;
double dLastVal = vix[iLastIndex];
- for (int i = 0; i < vix.length-1; ++i)
+ for (int i = 0; i < vix.length-1; i++)
{
- while(i<vmb.length-1 && dLastVal == vmb[i+1]) ++i;
+ while(i<vmb.length-1 && dLastVal == vmb[i+1]) i++;
for (int j=iLastIndex+1; j<=i; ++j)
vix[j] = vix[iLastIndex];
if (i < vix.length-1) {
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 48b0b2d..842982d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5069,7 +5069,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
// keep scanning the weights, until we hit the required position <code>fromPos</code>
while ( count < fromPos ) {
count += quickGetValue(index,1);
- ++index;
+ index++;
}
double runningSum;
@@ -5086,7 +5086,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
runningSum += (val * Math.min(wt, selectRange-selectedCount));
selectedCount += Math.min(wt, selectRange-selectedCount);
count += wt;
- ++index;
+ index++;
}
//System.out.println(fromPos + ", " + toPos + ": " + count + ", "+ runningSum + ", " + selectedCount);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
index 9c66518..c29ef5a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
@@ -108,7 +108,7 @@ extends SequenceFileInputFormat<K,V>
// take N samples from different parts of the input
int totalcount = 0;
- for(int i=0; i < samples; ++i) {
+ for(int i=0; i < samples; i++) {
SequenceFileRecordReader reader =
(SequenceFileRecordReader) inFormat.getRecordReader(splits[sampleStep * i], conf, null);
int count=0;
@@ -227,7 +227,7 @@ extends SequenceFileInputFormat<K,V>
float stepSize = numRecords / (float) numPartitions;
//System.out.println("Step size is " + stepSize);
ArrayList<WritableComparable> result = new ArrayList<WritableComparable>(numPartitions-1);
- for(int i=1; i < numPartitions; ++i) {
+ for(int i=1; i < numPartitions; i++) {
result.add(records.get(Math.round(stepSize * i)));
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 08a6e8d..c790ae9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -956,8 +956,8 @@ public class DataConverter
}
}
} else { // Block is in dense format
- for (int i=0; i<rowLength; ++i){
- for (int j=0; j<colLength; ++j){
+ for (int i=0; i<rowLength; i++){
+ for (int j=0; j<colLength; j++){
double value = mb.getValue(i, j);
if (value != 0.0){
sb.append(i+1).append(separator).append(j+1).append(separator);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index fa17fcd..88221f2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,6 +23,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -615,4 +618,45 @@ public class UtilFunctions
return in1.getDataType();
}
+
+ /*
+ * This function will convert Frame schema into DataFrame schema
+ *
+ * @param schema
+ * Frame schema in the form of List<ValueType>
+ * @return
+ * Returns the DataFrame schema (StructType)
+ */
+ public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
+ {
+ // Generate the schema based on the string of schema
+ List<StructField> fields = new ArrayList<StructField>();
+
+ int i = 1;
+ for (ValueType schema : lschema)
+ {
+ org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
+ switch(schema)
+ {
+ case STRING:
+ dataType = DataTypes.StringType;
+ break;
+ case DOUBLE:
+ dataType = DataTypes.DoubleType;
+ break;
+ case INT:
+ dataType = DataTypes.LongType;
+ break;
+ case BOOLEAN:
+ dataType = DataTypes.BooleanType;
+ break;
+ default:
+ System.out.println("Default schema type is String.");
+ }
+ fields.add(DataTypes.createStructField("C"+i++, dataType, true));
+ }
+
+ return DataTypes.createStructType(fields);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
index 20c4a27..0d3b932 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -214,7 +214,7 @@ public class FrameAppendDistTest extends AutomatedTestBase
}
private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
- for ( int i=0; i<frame1.getNumRows(); ++i )
+ for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<frame1.getNumColumns(); j++ ) {
Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 7a8392b..441a63b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -27,9 +27,15 @@ import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.StructType;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.parser.Expression.ValueType;
@@ -65,6 +71,7 @@ import org.junit.Test;
+
public class FrameConverterTest extends AutomatedTestBase
{
private final static String TEST_DIR = "functions/frame/";
@@ -100,8 +107,12 @@ public class FrameConverterTest extends AutomatedTestBase
BIN2TXTCELL,
MAT2BIN,
BIN2MAT,
+ DFRM2BIN,
+ BIN2DFRM,
}
+ private static String separator = ",";
+
@Override
public void setUp() {
TestUtils.clearAssertionInformation();
@@ -178,7 +189,16 @@ public class FrameConverterTest extends AutomatedTestBase
runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT);
}
-
+ @Test
+ public void testFrameMixedDFrameBinSpark() {
+ runFrameConverterTest(schemaMixedLarge, ConvType.DFRM2BIN);
+ }
+
+ @Test
+ public void testFrameMixedBinDFrameSpark() {
+ runFrameConverterTest(schemaMixedLarge, ConvType.BIN2DFRM);
+ }
+
/**
*
* @param schema
@@ -204,7 +224,8 @@ public class FrameConverterTest extends AutomatedTestBase
OutputInfo oinfo = null;
InputInfo iinfo = null;
switch( type ) {
- case CSV2BIN:
+ case CSV2BIN:
+ case DFRM2BIN:
oinfo = OutputInfo.CSVOutputInfo;
iinfo = InputInfo.BinaryBlockInputInfo;
break;
@@ -221,6 +242,7 @@ public class FrameConverterTest extends AutomatedTestBase
iinfo = InputInfo.TextCellInputInfo;
break;
case MAT2BIN:
+ case BIN2DFRM:
oinfo = OutputInfo.BinaryBlockOutputInfo;
iinfo = InputInfo.BinaryBlockInputInfo;
break;
@@ -389,7 +411,7 @@ public class FrameConverterTest extends AutomatedTestBase
* @param frame2
*/
private void verifyFrameData(FrameBlock frame1, FrameBlock frame2) {
- for ( int i=0; i<frame1.getNumRows(); ++i )
+ for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<frame1.getNumColumns(); j++ ) {
String val1 = UtilFunctions.objectToString(frame1.get(i, j));
String val2 = UtilFunctions.objectToString(frame2.get(i, j));
@@ -444,7 +466,7 @@ public class FrameConverterTest extends AutomatedTestBase
OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .csvToBinaryBlock(sc, rddIn, mc, false, ",", false, 0)
+ .csvToBinaryBlock(sc, rddIn, mc, false, separator, false, 0)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
break;
@@ -494,10 +516,69 @@ public class FrameConverterTest extends AutomatedTestBase
rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
break;
}
+ case DFRM2BIN: {
+ OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
+
+ //Create DataFrame
+ SQLContext sqlContext = new SQLContext(sc);
+ StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
+ JavaRDD<Row> rowRDD = getRowRDD(sc, fnameIn, separator);
+ DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+
+ JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
+ .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+ .mapToPair(new LongFrameToLongWritableFrameFunction());
+ rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+ break;
+ }
+ case BIN2DFRM: {
+ InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
+ OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
+ JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class);
+ JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
+ DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+
+ //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
+ JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
+ .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
+ .mapToPair(new LongFrameToLongWritableFrameFunction());
+ rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+
+ break;
+ }
default:
throw new RuntimeException("Unsuported converter type: "+type.toString());
}
sec.close();
}
+
+ /*
+ * It will return JavaRDD<Row> based on csv data input file.
+ */
+ JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String separator)
+ {
+ // Load a text file and convert each line to a java rdd.
+ JavaRDD<String> dataRdd = sc.textFile(fnameIn);
+ return dataRdd.map(new RowGenerator());
+ }
+
+ /*
+ * Row Generator class based on individual line in CSV file.
+ */
+ private static class RowGenerator implements Function<String,Row>
+ {
+ private static final long serialVersionUID = -6736256507697511070L;
+
+ @Override
+ public Row call(String record) throws Exception {
+ String[] fields = record.split(",");
+ Object[] objects = new Object[fields.length];
+ for (int i=0; i<fields.length; i++) {
+ objects[i] = fields[i];
+ }
+ return RowFactory.create(objects);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
index 9bc8d14..e713a86 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
@@ -172,7 +172,7 @@ public class FrameCopyTest extends AutomatedTestBase
void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
{
List<ValueType> lschema = frame1.getSchema();
- for ( int i=0; i<frame1.getNumRows(); ++i )
+ for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<lschema.size(); j++ ) {
if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0)
Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i, j) +
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
index b43dc6b..86dee49 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
@@ -268,7 +268,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase
}
private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
- for ( int i=0; i<frame1.getNumRows(); ++i )
+ for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<frame1.getNumColumns(); j++ ) {
Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
index 7d45ebc..d46c11f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
@@ -215,7 +215,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
{
List<ValueType> lschema = frame1.getSchema();
- for ( int i=0; i<frame1.getNumRows(); ++i )
+ for ( int i=0; i<frame1.getNumRows(); i++ )
for( int j=0; j<lschema.size(); j++ ) {
if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0)
Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i, j) +