You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/17 05:14:32 UTC
incubator-systemml git commit: [SYSTEMML-1277] MLContext implicitly
convert mllib Vector to ml Vector
Repository: incubator-systemml
Updated Branches:
refs/heads/master 03fdf0432 -> 0c85c1e52
[SYSTEMML-1277] MLContext implicitly convert mllib Vector to ml Vector
Implicitly convert dataframe mllib.Vector columns to ml.Vector columns in
MLContext API.
Closes #397.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0c85c1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0c85c1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0c85c1e5
Branch: refs/heads/master
Commit: 0c85c1e52e02ad740f3d4cab5a7e4bf7258061e1
Parents: 03fdf04
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Feb 16 21:11:43 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Feb 16 21:11:43 2017 -0800
----------------------------------------------------------------------
.../sysml/api/mlcontext/MLContextUtil.java | 6 +-
.../integration/mlcontext/MLContextTest.java | 122 ++++++++++++++++++-
2 files changed, 124 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0c85c1e5/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index c44843e..22595c0 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -39,7 +39,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ml.linalg.VectorUDT;
+import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -505,6 +505,7 @@ public final class MLContextUtil {
@SuppressWarnings("unchecked")
Dataset<Row> dataFrame = (Dataset<Row>) value;
+ dataFrame = MLUtils.convertVectorColumnsToML(dataFrame);
if (hasMatrixMetadata) {
return MLContextConversionUtil.dataFrameToMatrixObject(name, dataFrame, (MatrixMetadata) metadata);
} else if (hasFrameMetadata) {
@@ -598,7 +599,8 @@ public final class MLContextUtil {
for (StructField field : fields) {
DataType dataType = field.dataType();
if ((dataType != DataTypes.DoubleType) && (dataType != DataTypes.IntegerType)
- && (dataType != DataTypes.LongType) && (!(dataType instanceof VectorUDT))) {
+ && (dataType != DataTypes.LongType) && (!(dataType instanceof org.apache.spark.ml.linalg.VectorUDT))
+ && (!(dataType instanceof org.apache.spark.mllib.linalg.VectorUDT)) ) {
// uncomment if we support arrays of doubles for matrices
// if (dataType instanceof ArrayType) {
// ArrayType arrayType = (ArrayType) dataType;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0c85c1e5/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
index abea5be..f6ef208 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
@@ -707,7 +707,57 @@ public class MLContextTest extends AutomatedTestBase {
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
- Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
+ Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
+ setExpectedStdOut("sum: 45.0");
+ ml.execute(script);
+ }
+
+ @Test
+ public void testDataFrameSumDMLMllibVectorWithIDColumn() {
+ System.out.println("MLContextTest - DataFrame sum DML, mllib vector with ID column");
+
+ List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>>();
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
+ JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);
+
+ JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+ List<StructField> fields = new ArrayList<StructField>();
+ fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
+ fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
+ StructType schema = DataTypes.createStructType(fields);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
+
+ MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
+
+ Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
+ setExpectedStdOut("sum: 45.0");
+ ml.execute(script);
+ }
+
+ @Test
+ public void testDataFrameSumPYDMLMllibVectorWithIDColumn() {
+ System.out.println("MLContextTest - DataFrame sum PYDML, mllib vector with ID column");
+
+ List<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> list = new ArrayList<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>>();
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(1.0, org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0)));
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(2.0, org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0)));
+ list.add(new Tuple2<Double, org.apache.spark.mllib.linalg.Vector>(3.0, org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0)));
+ JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list);
+
+ JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow());
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+ List<StructField> fields = new ArrayList<StructField>();
+ fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
+ fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
+ StructType schema = DataTypes.createStructType(fields);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
+
+ MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
+
+ Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@@ -755,7 +805,55 @@ public class MLContextTest extends AutomatedTestBase {
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR);
- Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
+ Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
+ setExpectedStdOut("sum: 45.0");
+ ml.execute(script);
+ }
+
+ @Test
+ public void testDataFrameSumDMLMllibVectorWithNoIDColumn() {
+ System.out.println("MLContextTest - DataFrame sum DML, mllib vector with no ID column");
+
+ List<org.apache.spark.mllib.linalg.Vector> list = new ArrayList<org.apache.spark.mllib.linalg.Vector>();
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0));
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0));
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0));
+ JavaRDD<org.apache.spark.mllib.linalg.Vector> javaRddVector = sc.parallelize(list);
+
+ JavaRDD<Row> javaRddRow = javaRddVector.map(new MllibVectorRow());
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+ List<StructField> fields = new ArrayList<StructField>();
+ fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
+ StructType schema = DataTypes.createStructType(fields);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
+
+ MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR);
+
+ Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame, mm);
+ setExpectedStdOut("sum: 45.0");
+ ml.execute(script);
+ }
+
+ @Test
+ public void testDataFrameSumPYDMLMllibVectorWithNoIDColumn() {
+ System.out.println("MLContextTest - DataFrame sum PYDML, mllib vector with no ID column");
+
+ List<org.apache.spark.mllib.linalg.Vector> list = new ArrayList<org.apache.spark.mllib.linalg.Vector>();
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(1.0, 2.0, 3.0));
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(4.0, 5.0, 6.0));
+ list.add(org.apache.spark.mllib.linalg.Vectors.dense(7.0, 8.0, 9.0));
+ JavaRDD<org.apache.spark.mllib.linalg.Vector> javaRddVector = sc.parallelize(list);
+
+ JavaRDD<Row> javaRddRow = javaRddVector.map(new MllibVectorRow());
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+ List<StructField> fields = new ArrayList<StructField>();
+ fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true));
+ StructType schema = DataTypes.createStructType(fields);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
+
+ MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR);
+
+ Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame, mm);
setExpectedStdOut("sum: 45.0");
ml.execute(script);
}
@@ -771,6 +869,17 @@ public class MLContextTest extends AutomatedTestBase {
}
}
+ static class DoubleMllibVectorRow implements Function<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>, Row> {
+ private static final long serialVersionUID = -3121178154451876165L;
+
+ @Override
+ public Row call(Tuple2<Double, org.apache.spark.mllib.linalg.Vector> tup) throws Exception {
+ Double doub = tup._1();
+ org.apache.spark.mllib.linalg.Vector vect = tup._2();
+ return RowFactory.create(doub, vect);
+ }
+ }
+
static class VectorRow implements Function<Vector, Row> {
private static final long serialVersionUID = 7077761802433569068L;
@@ -780,6 +889,15 @@ public class MLContextTest extends AutomatedTestBase {
}
}
+ static class MllibVectorRow implements Function<org.apache.spark.mllib.linalg.Vector, Row> {
+ private static final long serialVersionUID = -408929813562996706L;
+
+ @Override
+ public Row call(org.apache.spark.mllib.linalg.Vector vect) throws Exception {
+ return RowFactory.create(vect);
+ }
+ }
+
static class CommaSeparatedValueStringToRow implements Function<String, Row> {
private static final long serialVersionUID = -7871020122671747808L;