You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/16 23:14:47 UTC

incubator-systemml git commit: [SYSTEMML-1280] Restore and deprecate SQLContext methods

Repository: incubator-systemml
Updated Branches:
  refs/heads/master bbc77e71e -> e7510a03b


[SYSTEMML-1280] Restore and deprecate SQLContext methods

Restore Java SQLContext method signatures (migrated to SparkSession by
SYSTEMML-1194) in case any users are using the old SystemML methods and
are unable to use SparkSessions.
Applies to old MLContext, MLMatrix, MLOutput, FrameRDDConverterUtils,
RDDConverterUtils, and RDDConverterUtilsExt.

Closes #396.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e7510a03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e7510a03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e7510a03

Branch: refs/heads/master
Commit: e7510a03b95929b34e2fe6632c640851816716ea
Parents: bbc77e7
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Feb 16 15:12:36 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Feb 16 15:12:36 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    | 18 +++++-
 .../java/org/apache/sysml/api/MLMatrix.java     | 20 +++++-
 .../java/org/apache/sysml/api/MLOutput.java     | 66 +++++++++++++++++++-
 .../spark/utils/FrameRDDConverterUtils.java     |  9 +++
 .../spark/utils/RDDConverterUtils.java          |  9 +++
 .../spark/utils/RDDConverterUtilsExt.java       | 17 ++++-
 6 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
index a128c37..a5dc859 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.jmlc.JMLCUtils;
@@ -1593,5 +1594,20 @@ public class MLContext {
 		JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output");
 		MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
 		return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut);
-	}	
+	}
+
+	/**
+	 * Experimental API: Might be discontinued in future release
+	 * @param sqlContext the SQL Context
+	 * @param filePath the file path
+	 * @param format the format
+	 * @return the MLMatrix
+	 * @throws IOException if IOException occurs
+	 * @throws DMLException if DMLException occurs
+	 * @throws ParseException if ParseException occurs
+	 */
+	public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException {
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return read(sparkSession, filePath, format);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java
index 873e831..45f631f 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
@@ -76,11 +77,21 @@ public class MLMatrix extends Dataset<Row> {
 		this.ml = ml;
 	}
 
+	protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) {
+		super(sqlContext, logicalPlan, RowEncoder.apply(null));
+		this.ml = ml;
+	}
+
 	protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) {
 		super(sparkSession, queryExecution, RowEncoder.apply(null));
 		this.ml = ml;
 	}
-	
+
+	protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) {
+		super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null));
+		this.ml = ml;
+	}
+
 	// Only used internally to set a new MLMatrix after one of matrix operations.
 	// Not to be used externally.
 	protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException {
@@ -110,7 +121,12 @@ public class MLMatrix extends Dataset<Row> {
 		StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
 		return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
 	}
-	
+
+	static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException {
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return createMLMatrix(ml, sparkSession, blocks, mc);
+	}
+
 	/**
 	 * Convenient method to write a MLMatrix.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index ca90fc9..a16eccd 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -26,6 +26,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -107,7 +108,24 @@ public class MLOutput {
 		}
 		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
 	}
-	
+
+	/**
+	 * Note, the output DataFrame has an additional column ID.
+	 * An easy way to get DataFrame without ID is by df.drop("__INDEX")
+	 * 
+	 * @param sqlContext the SQL Context
+	 * @param varName the variable name
+	 * @return the DataFrame
+	 * @throws DMLRuntimeException if DMLRuntimeException occurs
+	 */
+	public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException {
+		if (sqlContext == null) {
+			throw new DMLRuntimeException("SQLContext is not created");
+		}
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return getDF(sparkSession, varName);
+	}
+
 	/**
 	 * Obtain the DataFrame
 	 * 
@@ -134,7 +152,24 @@ public class MLOutput {
 		}
 		
 	}
-	
+
+	/**
+	 * Obtain the DataFrame
+	 * 
+	 * @param sqlContext the SQL Context
+	 * @param varName the variable name
+	 * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector
+	 * @return the DataFrame
+	 * @throws DMLRuntimeException if DMLRuntimeException occurs
+	 */
+	public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException {
+		if (sqlContext == null) {
+			throw new DMLRuntimeException("SQLContext is not created");
+		}
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return getDF(sparkSession, varName, outputVector);
+	}
+
 	/**
 	 * This methods improves the performance of MLPipeline wrappers.
 	 * 
@@ -153,6 +188,25 @@ public class MLOutput {
 		JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName);
 		return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true);
 	}
+
+	/**
+	 * This methods improves the performance of MLPipeline wrappers.
+	 * 
+	 * @param sqlContext the SQL Context
+	 * @param varName the variable name
+	 * @param mc the matrix characteristics
+	 * @return the DataFrame
+	 * @throws DMLRuntimeException if DMLRuntimeException occurs
+	 */
+	public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) 
+		throws DMLRuntimeException 
+	{
+		if (sqlContext == null) {
+			throw new DMLRuntimeException("SQLContext is not created");
+		}
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return getDF(sparkSession, varName, mc);
+	}
 	
 	public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException {
 		if(format.equals("text")) {
@@ -202,4 +256,12 @@ public class MLOutput {
 		}
 		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
 	}
+	
+	public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {
+		if (sqlContext == null) {
+			throw new DMLRuntimeException("SQLContext is not created");
+		}
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return getMLMatrix(ml, sparkSession, varName);
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3d5df56..013a1a8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
@@ -285,6 +286,14 @@ public class FrameRDDConverterUtils
 		return sparkSession.createDataFrame(rowRDD, dfSchema);
 	}
 	
+	@Deprecated
+	public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext, JavaPairRDD<Long,FrameBlock> in, 
+			MatrixCharacteristics mc, ValueType[] schema)
+	{
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return binaryBlockToDataFrame(sparkSession, in, mc, schema);
+	}
+	
 	
 	/**
 	 * This function will convert Frame schema into DataFrame schema 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6b6c61d..e847471 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.Vectors;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
@@ -286,6 +287,14 @@ public class RDDConverterUtils
 		return sparkSession.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
 	}
 
+	@Deprecated
+	public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext,
+			JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)  
+	{
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return binaryBlockToDataFrame(sparkSession, in, mc, toVector);
+	}
+
 	public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in)
 	{
 		return in.mapToPair(new TextToSerTextFunction());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f4a02dd..973db64 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -236,7 +236,22 @@ public class RDDConverterUtilsExt
 		JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID());
 		return sparkSession.createDataFrame(newRows, new StructType(newSchema));
 	}
-	
+
+	/**
+	 * Add element indices as new column to DataFrame
+	 * 
+	 * @param df input data frame
+	 * @param sqlContext the SQL Context
+	 * @param nameOfCol name of index column
+	 * @return new data frame
+	 * 
+	 * @deprecated This will be removed in SystemML 1.0.
+	 */
+	@Deprecated
+	public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) {
+		SparkSession sparkSession = sqlContext.sparkSession();
+		return addIDToDataFrame(df, sparkSession, nameOfCol);
+	}
 	
 	
 	private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>