You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/16 23:14:47 UTC
incubator-systemml git commit: [SYSTEMML-1280] Restore and deprecate
SQLContext methods
Repository: incubator-systemml
Updated Branches:
refs/heads/master bbc77e71e -> e7510a03b
[SYSTEMML-1280] Restore and deprecate SQLContext methods
Restore Java SQLContext method signatures (migrated to SparkSession by
SYSTEMML-1194) in case any users are using the old SystemML methods and
are unable to use SparkSessions.
Applies to old MLContext, MLMatrix, MLOutput, FrameRDDConverterUtils,
RDDConverterUtils, and RDDConverterUtilsExt.
Closes #396.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e7510a03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e7510a03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e7510a03
Branch: refs/heads/master
Commit: e7510a03b95929b34e2fe6632c640851816716ea
Parents: bbc77e7
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Feb 16 15:12:36 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Feb 16 15:12:36 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/api/MLContext.java | 18 +++++-
.../java/org/apache/sysml/api/MLMatrix.java | 20 +++++-
.../java/org/apache/sysml/api/MLOutput.java | 66 +++++++++++++++++++-
.../spark/utils/FrameRDDConverterUtils.java | 9 +++
.../spark/utils/RDDConverterUtils.java | 9 +++
.../spark/utils/RDDConverterUtilsExt.java | 17 ++++-
6 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
index a128c37..a5dc859 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.api.jmlc.JMLCUtils;
@@ -1593,5 +1594,20 @@ public class MLContext {
JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output");
MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut);
- }
+ }
+
+ /**
+ * Experimental API: Might be discontinued in future release
+ * @param sqlContext the SQL Context
+ * @param filePath the file path
+ * @param format the format
+ * @return the MLMatrix
+ * @throws IOException if IOException occurs
+ * @throws DMLException if DMLException occurs
+ * @throws ParseException if ParseException occurs
+ */
+ public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return read(sparkSession, filePath, format);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java
index 873e831..45f631f 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
@@ -76,11 +77,21 @@ public class MLMatrix extends Dataset<Row> {
this.ml = ml;
}
+ protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) {
+ super(sqlContext, logicalPlan, RowEncoder.apply(null));
+ this.ml = ml;
+ }
+
protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) {
super(sparkSession, queryExecution, RowEncoder.apply(null));
this.ml = ml;
}
-
+
+ protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) {
+ super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null));
+ this.ml = ml;
+ }
+
// Only used internally to set a new MLMatrix after one of matrix operations.
// Not to be used externally.
protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException {
@@ -110,7 +121,12 @@ public class MLMatrix extends Dataset<Row> {
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
}
-
+
+ static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return createMLMatrix(ml, sparkSession, blocks, mc);
+ }
+
/**
* Convenient method to write a MLMatrix.
*
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index ca90fc9..a16eccd 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -26,6 +26,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -107,7 +108,24 @@ public class MLOutput {
}
throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
}
-
+
+ /**
+ * Note, the output DataFrame has an additional column ID.
+ * An easy way to get DataFrame without ID is by df.drop("__INDEX")
+ *
+ * @param sqlContext the SQL Context
+ * @param varName the variable name
+ * @return the DataFrame
+ * @throws DMLRuntimeException if DMLRuntimeException occurs
+ */
+ public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException {
+ if (sqlContext == null) {
+ throw new DMLRuntimeException("SQLContext is not created");
+ }
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return getDF(sparkSession, varName);
+ }
+
/**
* Obtain the DataFrame
*
@@ -134,7 +152,24 @@ public class MLOutput {
}
}
-
+
+ /**
+ * Obtain the DataFrame
+ *
+ * @param sqlContext the SQL Context
+ * @param varName the variable name
+ * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector
+ * @return the DataFrame
+ * @throws DMLRuntimeException if DMLRuntimeException occurs
+ */
+ public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException {
+ if (sqlContext == null) {
+ throw new DMLRuntimeException("SQLContext is not created");
+ }
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return getDF(sparkSession, varName, outputVector);
+ }
+
/**
* This methods improves the performance of MLPipeline wrappers.
*
@@ -153,6 +188,25 @@ public class MLOutput {
JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName);
return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true);
}
+
+ /**
+ * This methods improves the performance of MLPipeline wrappers.
+ *
+ * @param sqlContext the SQL Context
+ * @param varName the variable name
+ * @param mc the matrix characteristics
+ * @return the DataFrame
+ * @throws DMLRuntimeException if DMLRuntimeException occurs
+ */
+ public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc)
+ throws DMLRuntimeException
+ {
+ if (sqlContext == null) {
+ throw new DMLRuntimeException("SQLContext is not created");
+ }
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return getDF(sparkSession, varName, mc);
+ }
public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException {
if(format.equals("text")) {
@@ -202,4 +256,12 @@ public class MLOutput {
}
throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
}
+
+ public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {
+ if (sqlContext == null) {
+ throw new DMLRuntimeException("SQLContext is not created");
+ }
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return getMLMatrix(ml, sparkSession, varName);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3d5df56..013a1a8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -285,6 +286,14 @@ public class FrameRDDConverterUtils
return sparkSession.createDataFrame(rowRDD, dfSchema);
}
+ @Deprecated
+ public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext, JavaPairRDD<Long,FrameBlock> in,
+ MatrixCharacteristics mc, ValueType[] schema)
+ {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return binaryBlockToDataFrame(sparkSession, in, mc, schema);
+ }
+
/**
* This function will convert Frame schema into DataFrame schema
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6b6c61d..e847471 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
@@ -286,6 +287,14 @@ public class RDDConverterUtils
return sparkSession.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
}
+ @Deprecated
+ public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext,
+ JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)
+ {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return binaryBlockToDataFrame(sparkSession, in, mc, toVector);
+ }
+
public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in)
{
return in.mapToPair(new TextToSerTextFunction());
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f4a02dd..973db64 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -236,7 +236,22 @@ public class RDDConverterUtilsExt
JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID());
return sparkSession.createDataFrame(newRows, new StructType(newSchema));
}
-
+
+ /**
+ * Add element indices as new column to DataFrame
+ *
+ * @param df input data frame
+ * @param sqlContext the SQL Context
+ * @param nameOfCol name of index column
+ * @return new data frame
+ *
+ * @deprecated This will be removed in SystemML 1.0.
+ */
+ @Deprecated
+ public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return addIDToDataFrame(df, sparkSession, nameOfCol);
+ }
private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>