You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/15 19:29:42 UTC
[1/2] incubator-systemml git commit: [SYSTEMML-1194] Replace
SQLContext with SparkSession
Repository: incubator-systemml
Updated Branches:
refs/heads/master 201238fd3 -> 80ab57bda
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
index e15f8dd..abea5be 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java
@@ -53,7 +53,7 @@ import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -513,13 +513,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES);
@@ -539,13 +539,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES);
@@ -565,14 +565,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
@@ -592,14 +592,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
@@ -619,14 +619,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
@@ -646,14 +646,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX);
@@ -673,12 +673,12 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
@@ -698,12 +698,12 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX);
@@ -723,11 +723,11 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Vector> javaRddVector = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR);
@@ -747,11 +747,11 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Vector> javaRddVector = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR);
@@ -1559,13 +1559,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
BinaryBlockMatrix binaryBlockMatrix = new BinaryBlockMatrix(dataFrame);
Script script = dml("avg = avg(M);").in("M", binaryBlockMatrix).out("avg");
@@ -1584,13 +1584,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
BinaryBlockMatrix binaryBlockMatrix = new BinaryBlockMatrix(dataFrame);
Script script = pydml("avg = avg(M)").in("M", binaryBlockMatrix).out("avg");
@@ -1853,13 +1853,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(3, 3, 9);
@@ -1879,13 +1879,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
MatrixMetadata mm = new MatrixMetadata(3, 3, 9);
@@ -2069,13 +2069,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
@@ -2093,13 +2093,13 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
@@ -2117,14 +2117,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
@@ -2142,14 +2142,14 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame);
setExpectedStdOut("sum: 27.0");
@@ -2167,12 +2167,12 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
@@ -2190,12 +2190,12 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
@@ -2213,11 +2213,11 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Vector> javaRddVector = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
@@ -2235,11 +2235,11 @@ public class MLContextTest extends AutomatedTestBase {
JavaRDD<Vector> javaRddVector = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", new VectorUDT(), true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> dataFrame = sqlContext.createDataFrame(javaRddRow, schema);
+ Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema);
Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame);
setExpectedStdOut("sum: 45.0");
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala b/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala
index 555d0a2..689bf82 100644
--- a/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala
+++ b/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.sql._
import scala.reflect.runtime.universe._
case class LabeledDocument[T:TypeTag](id: Long, text: String, label: Double)
@@ -40,9 +41,9 @@ class LogisticRegressionSuite extends FunSuite with WrapperSparkContext with Mat
test("run logistic regression with default") {
//Make sure system ml home set when run wrapper
- val newsqlContext = new org.apache.spark.sql.SQLContext(sc);
+ val newSparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate();
- import newsqlContext.implicits._
+ import newSparkSession.implicits._
val training = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)),
@@ -62,8 +63,8 @@ class LogisticRegressionSuite extends FunSuite with WrapperSparkContext with Mat
test("test logistic regression with mlpipeline"){
//Make sure system ml home set when run wrapper
- val newsqlContext = new org.apache.spark.sql.SQLContext(sc);
- import newsqlContext.implicits._
+ val newSparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate();
+ import newSparkSession.implicits._
val training = sc.parallelize(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 2.0),
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala
----------------------------------------------------------------------
diff --git a/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala b/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala
index 205a1a9..0bd6f27 100644
--- a/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala
+++ b/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala
@@ -21,27 +21,21 @@ package org.apache.sysml.api.ml
import org.scalatest.{ BeforeAndAfterAll, Suite }
import org.apache.spark.{ SparkConf, SparkContext }
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
trait WrapperSparkContext extends BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
- @transient var sqlContext: SQLContext = _
+ @transient var sparkSession: SparkSession = _
override def beforeAll() {
super.beforeAll()
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("MLlibUnitTest")
- sc = new SparkContext(conf)
- //SQLContext.clearActive()
- sqlContext = new SQLContext(sc)
- //SQLContext.setActive(sqlContext)
+ sparkSession = SparkSession.builder().master("local[2]").appName("MLlibUnitTest").getOrCreate();
+ sc = sparkSession.sparkContext;
}
override def afterAll() {
try {
- sqlContext = null
- //SQLContext.clearActive()
+ sparkSession = null
if (sc != null) {
sc.stop()
}
[2/2] incubator-systemml git commit: [SYSTEMML-1194] Replace
SQLContext with SparkSession
Posted by de...@apache.org.
[SYSTEMML-1194] Replace SQLContext with SparkSession
SQLContext constructors have been deprecated in Spark 2.
Replace SQLContext references with SparkSession.
Closes #360.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/80ab57bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/80ab57bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/80ab57bd
Branch: refs/heads/master
Commit: 80ab57bda803111e566284aed31ca241ad73edee
Parents: 201238f
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Wed Feb 15 11:28:28 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Wed Feb 15 11:28:28 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/api/MLContext.java | 14 +--
.../java/org/apache/sysml/api/MLMatrix.java | 28 +++---
.../java/org/apache/sysml/api/MLOutput.java | 50 ++++++-----
.../api/mlcontext/MLContextConversionUtil.java | 9 +-
.../org/apache/sysml/api/python/SystemML.py | 34 ++------
.../spark/utils/FrameRDDConverterUtils.java | 13 ++-
.../spark/utils/RDDConverterUtils.java | 13 ++-
.../spark/utils/RDDConverterUtilsExt.java | 44 ++++++++--
src/main/python/systemml/converters.py | 4 +-
src/main/python/systemml/defmatrix.py | 8 +-
src/main/python/systemml/mllearn/estimators.py | 76 ++++++++---------
src/main/python/tests/test_mllearn_df.py | 12 +--
src/main/python/tests/test_mllearn_numpy.py | 20 ++---
.../sysml/api/ml/BaseSystemMLClassifier.scala | 4 +-
.../sysml/api/ml/BaseSystemMLRegressor.scala | 2 +-
.../sysml/api/ml/LogisticRegression.scala | 10 +--
.../test/integration/AutomatedTestBase.java | 4 +-
.../conversion/RDDConverterUtilsExtTest.java | 20 ++---
.../functions/frame/FrameConverterTest.java | 9 +-
.../DataFrameMatrixConversionTest.java | 8 +-
.../DataFrameRowFrameConversionTest.java | 13 +--
.../DataFrameVectorFrameConversionTest.java | 24 ++----
.../mlcontext/DataFrameVectorScriptTest.java | 24 ++----
.../functions/mlcontext/FrameTest.java | 10 +--
.../mlcontext/MLContextFrameTest.java | 27 +++---
.../integration/mlcontext/MLContextTest.java | 90 ++++++++++----------
.../sysml/api/ml/LogisticRegressionSuite.scala | 9 +-
.../sysml/api/ml/WrapperSparkContext.scala | 16 ++--
28 files changed, 294 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
index 520e51e..a128c37 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -37,7 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.api.jmlc.JMLCUtils;
import org.apache.sysml.api.mlcontext.ScriptType;
@@ -101,9 +101,9 @@ import org.apache.sysml.utils.Statistics;
* <p>
* Create input DataFrame from CSV file and potentially perform some feature transformation
* <pre><code>
- * scala> val W = sqlContext.load("com.databricks.spark.csv", Map("path" -> "W.csv", "header" -> "false"))
- * scala> val H = sqlContext.load("com.databricks.spark.csv", Map("path" -> "H.csv", "header" -> "false"))
- * scala> val V = sqlContext.load("com.databricks.spark.csv", Map("path" -> "V.csv", "header" -> "false"))
+ * scala> val W = sparkSession.load("com.databricks.spark.csv", Map("path" -> "W.csv", "header" -> "false"))
+ * scala> val H = sparkSession.load("com.databricks.spark.csv", Map("path" -> "H.csv", "header" -> "false"))
+ * scala> val V = sparkSession.load("com.databricks.spark.csv", Map("path" -> "V.csv", "header" -> "false"))
* </code></pre>
* <p>
* Create MLContext
@@ -1578,7 +1578,7 @@ public class MLContext {
// TODO: Add additional create to provide sep, missing values, etc. for CSV
/**
* Experimental API: Might be discontinued in future release
- * @param sqlContext the SQLContext
+ * @param sparkSession the Spark Session
* @param filePath the file path
* @param format the format
* @return the MLMatrix
@@ -1586,12 +1586,12 @@ public class MLContext {
* @throws DMLException if DMLException occurs
* @throws ParseException if ParseException occurs
*/
- public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException {
+ public MLMatrix read(SparkSession sparkSession, String filePath, String format) throws IOException, DMLException, ParseException {
this.reset();
this.registerOutput("output");
MLOutput out = this.executeScript("output = read(\"" + filePath + "\", format=\"" + format + "\"); " + MLMatrix.writeStmt);
JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output");
MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
- return MLMatrix.createMLMatrix(this, sqlContext, blocks, mcOut);
+ return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java
index a8afa76..873e831 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -25,10 +25,10 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -57,8 +57,8 @@ import scala.Tuple2;
import org.apache.sysml.api.{MLContext, MLMatrix}
val ml = new MLContext(sc)
- val mat1 = ml.read(sqlContext, "V_small.csv", "csv")
- val mat2 = ml.read(sqlContext, "W_small.mtx", "binary")
+ val mat1 = ml.read(sparkSession, "V_small.csv", "csv")
+ val mat2 = ml.read(sparkSession, "W_small.mtx", "binary")
val result = mat1.transpose() %*% mat2
result.write("Result_small.mtx", "text")
@@ -71,20 +71,20 @@ public class MLMatrix extends Dataset<Row> {
protected MatrixCharacteristics mc = null;
protected MLContext ml = null;
- protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) {
- super(sqlContext, logicalPlan, RowEncoder.apply(null));
+ protected MLMatrix(SparkSession sparkSession, LogicalPlan logicalPlan, MLContext ml) {
+ super(sparkSession, logicalPlan, RowEncoder.apply(null));
this.ml = ml;
}
- protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) {
- super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null));
+ protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) {
+ super(sparkSession, queryExecution, RowEncoder.apply(null));
this.ml = ml;
}
// Only used internally to set a new MLMatrix after one of matrix operations.
// Not to be used externally.
protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException {
- super(df.sqlContext(), df.logicalPlan(), RowEncoder.apply(null));
+ super(df.sparkSession(), df.logicalPlan(), RowEncoder.apply(null));
this.mc = mc;
this.ml = ml;
}
@@ -105,10 +105,10 @@ public class MLMatrix extends Dataset<Row> {
// }
// ------------------------------------------------------------------------------------------------
- static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException {
+ static MLMatrix createMLMatrix(MLContext ml, SparkSession sparkSession, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException {
RDD<Row> rows = blocks.map(new GetMLBlock()).rdd();
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
- return new MLMatrix(sqlContext.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
+ return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
}
/**
@@ -233,7 +233,7 @@ public class MLMatrix extends Dataset<Row> {
RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd();
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
- return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
+ return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
}
private MLMatrix scalarBinaryOp(Double scalar, String op, boolean isScalarLeft) throws IOException, DMLException {
@@ -244,7 +244,7 @@ public class MLMatrix extends Dataset<Row> {
RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd();
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
- return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
+ return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
}
// ---------------------------------------------------
@@ -349,7 +349,7 @@ public class MLMatrix extends Dataset<Row> {
RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd();
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
- return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
+ return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml);
}
// TODO: For 'scalar op matrix' operations: Do implicit conversions
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index 0fcbfdf..ca90fc9 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -90,65 +90,68 @@ public class MLOutput {
/**
* Note, the output DataFrame has an additional column ID.
* An easy way to get DataFrame without ID is by df.drop("__INDEX")
- * @param sqlContext the SQLContext
+ *
+ * @param sparkSession the Spark Session
* @param varName the variable name
* @return the DataFrame
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException {
- if(sqlContext == null) {
- throw new DMLRuntimeException("SQLContext is not created.");
+ public Dataset<Row> getDF(SparkSession sparkSession, String varName) throws DMLRuntimeException {
+ if(sparkSession == null) {
+ throw new DMLRuntimeException("SparkSession is not created.");
}
JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName);
if(rdd != null) {
MatrixCharacteristics mc = _outMetadata.get(varName);
- return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, false);
+ return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, false);
}
throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
}
/**
* Obtain the DataFrame
- * @param sqlContext the SQLContext
+ *
+ * @param sparkSession the Spark Session
* @param varName the variable name
* @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector
* @return the DataFrame
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException {
- if(sqlContext == null) {
- throw new DMLRuntimeException("SQLContext is not created.");
+ public Dataset<Row> getDF(SparkSession sparkSession, String varName, boolean outputVector) throws DMLRuntimeException {
+ if(sparkSession == null) {
+ throw new DMLRuntimeException("SparkSession is not created.");
}
if(outputVector) {
JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName);
if(rdd != null) {
MatrixCharacteristics mc = _outMetadata.get(varName);
- return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, true);
+ return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, true);
}
throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
}
else {
- return getDF(sqlContext, varName);
+ return getDF(sparkSession, varName);
}
}
/**
* This methods improves the performance of MLPipeline wrappers.
- * @param sqlContext the SQLContext
+ *
+ * @param sparkSession the Spark Session
* @param varName the variable name
* @param mc the matrix characteristics
* @return the DataFrame
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc)
+ public Dataset<Row> getDF(SparkSession sparkSession, String varName, MatrixCharacteristics mc)
throws DMLRuntimeException
{
- if(sqlContext == null)
- throw new DMLRuntimeException("SQLContext is not created.");
-
+ if(sparkSession == null) {
+ throw new DMLRuntimeException("SparkSession is not created.");
+ }
JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName);
- return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, binaryBlockRDD, mc, true);
+ return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true);
}
public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException {
@@ -180,12 +183,13 @@ public class MLOutput {
public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException {
JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName);
MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
- return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate();
+ return FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryRDD, mcIn, null);
}
- public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException {
- if(sqlContext == null) {
- throw new DMLRuntimeException("SQLContext is not created.");
+ public MLMatrix getMLMatrix(MLContext ml, SparkSession sparkSession, String varName) throws DMLRuntimeException {
+ if(sparkSession == null) {
+ throw new DMLRuntimeException("SparkSession is not created.");
}
else if(ml == null) {
throw new DMLRuntimeException("MLContext is not created.");
@@ -194,7 +198,7 @@ public class MLOutput {
if(rdd != null) {
MatrixCharacteristics mc = getMatrixCharacteristics(varName);
StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
- return new MLMatrix(sqlContext.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), mc, ml);
+ return new MLMatrix(sparkSession.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), mc, ml);
}
throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 5414e4d..0225ea8 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -37,7 +37,7 @@ import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.api.MLContextProxy;
@@ -1243,9 +1243,9 @@ public class MLContextConversionUtil {
MatrixCharacteristics mc = matrixObject.getMatrixCharacteristics();
SparkContext sc = ((MLContext) MLContextProxy.getActiveMLContextForAPI()).getSparkContext();
- SQLContext sqlctx = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate();
- return RDDConverterUtils.binaryBlockToDataFrame(sqlctx, binaryBlockMatrix, mc, isVectorDF);
+ return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockMatrix, mc, isVectorDF);
}
catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting matrix object to DataFrame", e);
@@ -1270,7 +1270,8 @@ public class MLContextConversionUtil {
MatrixCharacteristics mc = frameObject.getMatrixCharacteristics();
JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI());
- return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryBlockFrame, mc, frameObject.getSchema());
+ SparkSession sparkSession = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate();
+ return FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockFrame, mc, frameObject.getSchema());
}
catch (DMLRuntimeException e) {
throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/python/SystemML.py
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/python/SystemML.py b/src/main/java/org/apache/sysml/api/python/SystemML.py
index 3b8ae96..b22c570 100644
--- a/src/main/java/org/apache/sysml/api/python/SystemML.py
+++ b/src/main/java/org/apache/sysml/api/python/SystemML.py
@@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError, Py4JError
import traceback
import os
from pyspark.context import SparkContext
-from pyspark.sql import DataFrame, SQLContext
+from pyspark.sql import DataFrame, SparkSession
from pyspark.rdd import RDD
@@ -59,7 +59,7 @@ class MLContext(object):
setForcedSparkExecType = (args[1] if len(args) > 1 else False)
self.sc = sc
self.ml = sc._jvm.org.apache.sysml.api.MLContext(sc._jsc, monitorPerformance, setForcedSparkExecType)
- self.sqlCtx = SQLContext(sc)
+ self.sparkSession = SparkSession.builder.getOrCreate()
except Py4JError:
traceback.print_exc()
@@ -212,41 +212,21 @@ class MLOutput(object):
def getBinaryBlockedRDD(self, varName):
raise Exception('Not supported in Python MLContext')
- #try:
- # rdd = RDD(self.jmlOut.getBinaryBlockedRDD(varName), self.sc)
- # return rdd
- #except Py4JJavaError:
- # traceback.print_exc()
def getMatrixCharacteristics(self, varName):
raise Exception('Not supported in Python MLContext')
- #try:
- # chars = self.jmlOut.getMatrixCharacteristics(varName)
- # return chars
- #except Py4JJavaError:
- # traceback.print_exc()
- def getDF(self, sqlContext, varName):
+ def getDF(self, sparkSession, varName):
try:
- jdf = self.jmlOut.getDF(sqlContext._ssql_ctx, varName)
- df = DataFrame(jdf, sqlContext)
+ jdf = self.jmlOut.getDF(sparkSession, varName)
+ df = DataFrame(jdf, sparkSession)
return df
except Py4JJavaError:
traceback.print_exc()
-
- def getMLMatrix(self, sqlContext, varName):
+
+ def getMLMatrix(self, sparkSession, varName):
raise Exception('Not supported in Python MLContext')
- #try:
- # mlm = self.jmlOut.getMLMatrix(sqlContext._scala_SQLContext, varName)
- # return mlm
- #except Py4JJavaError:
- # traceback.print_exc()
def getStringRDD(self, varName, format):
raise Exception('Not supported in Python MLContext')
- #try:
- # rdd = RDD(self.jmlOut.getStringRDD(varName, format), self.sc)
- # return rdd
- #except Py4JJavaError:
- # traceback.print_exc()
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index ae3b686..3d5df56 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -22,9 +22,9 @@ package org.apache.sysml.runtime.instructions.spark.utils;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,14 +42,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-
-import scala.Tuple2;
-
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -70,6 +67,8 @@ import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.runtime.util.FastStringTokenizer;
import org.apache.sysml.runtime.util.UtilFunctions;
+import scala.Tuple2;
+
public class FrameRDDConverterUtils
@@ -267,7 +266,7 @@ public class FrameRDDConverterUtils
new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID, colVect));
}
- public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in,
+ public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession, JavaPairRDD<Long,FrameBlock> in,
MatrixCharacteristics mc, ValueType[] schema)
{
if( !mc.colsKnown() )
@@ -283,7 +282,7 @@ public class FrameRDDConverterUtils
StructType dfSchema = convertFrameSchemaToDFSchema(schema, true);
//rdd to data frame conversion
- return sqlctx.createDataFrame(rowRDD, dfSchema);
+ return sparkSession.createDataFrame(rowRDD, dfSchema);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 1310b80..4d95b95 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -34,21 +34,18 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.ml.feature.LabeledPoint;
import org.apache.spark.ml.linalg.DenseVector;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
-import org.apache.spark.ml.feature.LabeledPoint;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.util.LongAccumulator;
-
-import scala.Tuple2;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -68,6 +65,8 @@ import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.runtime.util.FastStringTokenizer;
import org.apache.sysml.runtime.util.UtilFunctions;
+import scala.Tuple2;
+
public class RDDConverterUtils
{
public static final String DF_ID_COLUMN = "__INDEX";
@@ -262,7 +261,7 @@ public class RDDConverterUtils
return out;
}
- public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx,
+ public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession,
JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector)
{
if( !mc.colsKnown() )
@@ -284,7 +283,7 @@ public class RDDConverterUtils
}
//rdd to data frame conversion
- return sqlctx.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
+ return sparkSession.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields));
}
public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in)
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index e3b4d0c..f4a02dd 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkContext;
-import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -44,6 +43,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -221,11 +221,11 @@ public class RDDConverterUtilsExt
* Add element indices as new column to DataFrame
*
* @param df input data frame
- * @param sqlContext SQL context
+ * @param sparkSession the Spark Session
* @param nameOfCol name of index column
* @return new data frame
*/
- public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) {
+ public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SparkSession sparkSession, String nameOfCol) {
StructField[] oldSchema = df.schema().fields();
StructField[] newSchema = new StructField[oldSchema.length + 1];
for(int i = 0; i < oldSchema.length; i++) {
@@ -234,7 +234,7 @@ public class RDDConverterUtilsExt
newSchema[oldSchema.length] = DataTypes.createStructField(nameOfCol, DataTypes.DoubleType, false);
// JavaRDD<Row> newRows = df.rdd().toJavaRDD().map(new AddRowID());
JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID());
- return sqlContext.createDataFrame(newRows, new StructType(newSchema));
+ return sparkSession.createDataFrame(newRows, new StructType(newSchema));
}
@@ -378,9 +378,42 @@ public class RDDConverterUtilsExt
* @return dataframe of ml.linalg.Vector rows
* @throws DMLRuntimeException
* if DMLRuntimeException occurs
+ *
+ * @deprecated This will be removed in SystemML 1.0. Please migrate to {@code
+ * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, Dataset<Row>) }
*/
+ @Deprecated
public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext sqlContext, Dataset<Row> inputDF)
throws DMLRuntimeException {
+ SparkSession sparkSession = sqlContext.sparkSession();
+ return stringDataFrameToVectorDataFrame(sparkSession, inputDF);
+ }
+
+ /**
+ * Convert a dataframe of comma-separated string rows to a dataframe of
+ * ml.linalg.Vector rows.
+ *
+ * <p>
+ * Example input rows:<br>
+ *
+ * <code>
+ * ((1.2, 4.3, 3.4))<br>
+ * (1.2, 3.4, 2.2)<br>
+ * [[1.2, 34.3, 1.2, 1.25]]<br>
+ * [1.2, 3.4]<br>
+ * </code>
+ *
+ * @param sparkSession
+ * Spark Session
+ * @param inputDF
+ * dataframe of comma-separated row strings to convert to
+ * dataframe of ml.linalg.Vector rows
+ * @return dataframe of ml.linalg.Vector rows
+ * @throws DMLRuntimeException
+ * if DMLRuntimeException occurs
+ */
+ public static Dataset<Row> stringDataFrameToVectorDataFrame(SparkSession sparkSession, Dataset<Row> inputDF)
+ throws DMLRuntimeException {
StructField[] oldSchema = inputDF.schema().fields();
StructField[] newSchema = new StructField[oldSchema.length];
@@ -444,8 +477,7 @@ public class RDDConverterUtilsExt
// output DF
JavaRDD<Row> newRows = inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector());
- Dataset<Row> outDF = sqlContext.createDataFrame(newRows.rdd(), DataTypes.createStructType(newSchema));
-
+ Dataset<Row> outDF = sparkSession.createDataFrame(newRows.rdd(), DataTypes.createStructType(newSchema));
return outDF;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/converters.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/converters.py b/src/main/python/systemml/converters.py
index c7ca8ef..2d32508 100644
--- a/src/main/python/systemml/converters.py
+++ b/src/main/python/systemml/converters.py
@@ -36,7 +36,7 @@ def getNumCols(numPyArr):
return numPyArr.shape[1]
-def convertToLabeledDF(sqlCtx, X, y=None):
+def convertToLabeledDF(sparkSession, X, y=None):
from pyspark.ml.feature import VectorAssembler
if y is not None:
pd1 = pd.DataFrame(X)
@@ -49,7 +49,7 @@ def convertToLabeledDF(sqlCtx, X, y=None):
inputColumns = ['C' + str(i) for i in pdf.columns]
outputColumns = inputColumns
assembler = VectorAssembler(inputCols=inputColumns, outputCol='features')
- out = assembler.transform(sqlCtx.createDataFrame(pdf, outputColumns))
+ out = assembler.transform(sparkSession.createDataFrame(pdf, outputColumns))
if y is not None:
return out.select('features', 'label')
else:
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/defmatrix.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/defmatrix.py b/src/main/python/systemml/defmatrix.py
index 5f973f6..3e13bf2 100644
--- a/src/main/python/systemml/defmatrix.py
+++ b/src/main/python/systemml/defmatrix.py
@@ -28,7 +28,7 @@ try:
import py4j.java_gateway
from py4j.java_gateway import JavaObject
from pyspark import SparkContext
- from pyspark.sql import DataFrame, SQLContext
+ from pyspark.sql import DataFrame, SparkSession
import pyspark.mllib.common
except ImportError:
raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')
@@ -46,7 +46,7 @@ def setSparkContext(sc):
SparkContext
"""
matrix.sc = sc
- matrix.sqlContext = SQLContext(sc)
+ matrix.sparkSession = SparkSession.builder.getOrCreate()
matrix.ml = MLContext(matrix.sc)
@@ -290,7 +290,7 @@ def solve(A, b):
>>> import numpy as np
>>> from sklearn import datasets
>>> import SystemML as sml
- >>> from pyspark.sql import SQLContext
+ >>> from pyspark.sql import SparkSession
>>> diabetes = datasets.load_diabetes()
>>> diabetes_X = diabetes.data[:, np.newaxis, 2]
>>> X_train = diabetes_X[:-20]
@@ -523,7 +523,7 @@ class matrix(object):
if isinstance(self.eval_data, Matrix):
self.eval_data = self.eval_data.toDF()
return self.eval_data
- self.eval_data = matrix.sqlContext.createDataFrame(self.toPandas())
+ self.eval_data = matrix.sparkSession.createDataFrame(self.toPandas())
return self.eval_data
def save(self, file, format='csv'):
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index d4ece89..c4eaf3d 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -37,8 +37,8 @@ import math
from ..converters import *
from ..classloader import *
-def assemble(sqlCtx, pdf, inputCols, outputCol):
- tmpDF = sqlCtx.createDataFrame(pdf, list(pdf.columns))
+def assemble(sparkSession, pdf, inputCols, outputCol):
+ tmpDF = sparkSession.createDataFrame(pdf, list(pdf.columns))
assembler = VectorAssembler(inputCols=list(inputCols), outputCol=outputCol)
return assembler.transform(tmpDF)
@@ -129,7 +129,7 @@ class BaseSystemMLEstimator(Estimator):
raise Exception('Number of rows of X and y should match')
colNames = pdfX.columns
pdfX[self.label_col] = pdfY[pdfY.columns[0]]
- df = assemble(self.sqlCtx, pdfX, colNames, self.features_col).select(self.features_col, self.label_col)
+ df = assemble(self.sparkSession, pdfX, colNames, self.features_col).select(self.features_col, self.label_col)
self.fit_df(df)
else:
numColsy = getNumCols(y)
@@ -157,9 +157,9 @@ class BaseSystemMLEstimator(Estimator):
if isinstance(X, SUPPORTED_TYPES):
if self.transferUsingDF:
pdfX = convertToPandasDF(X)
- df = assemble(self.sqlCtx, pdfX, pdfX.columns, self.features_col).select(self.features_col)
+ df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col)
retjDF = self.model.transform(df._jdf)
- retDF = DataFrame(retjDF, self.sqlCtx)
+ retDF = DataFrame(retjDF, self.sparkSession)
retPDF = retDF.sort('__INDEX').select('prediction').toPandas()
if isinstance(X, np.ndarray):
return self.decode(retPDF.as_matrix().flatten())
@@ -182,7 +182,7 @@ class BaseSystemMLEstimator(Estimator):
assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col)
df = assembler.transform(X)
retjDF = self.model.transform(df._jdf)
- retDF = DataFrame(retjDF, self.sqlCtx)
+ retDF = DataFrame(retjDF, self.sparkSession)
# Return DF
return retDF.sort('__INDEX')
else:
@@ -245,8 +245,8 @@ class LogisticRegression(BaseSystemMLClassifier):
>>> from sklearn import datasets, neighbors
>>> from systemml.mllearn import LogisticRegression
- >>> from pyspark.sql import SQLContext
- >>> sqlCtx = SQLContext(sc)
+ >>> from pyspark.sql import SparkSession
+ >>> sparkSession = SparkSession.builder.getOrCreate()
>>> digits = datasets.load_digits()
>>> X_digits = digits.data
>>> y_digits = digits.target + 1
@@ -255,7 +255,7 @@ class LogisticRegression(BaseSystemMLClassifier):
>>> y_train = y_digits[:.9 * n_samples]
>>> X_test = X_digits[.9 * n_samples:]
>>> y_test = y_digits[.9 * n_samples:]
- >>> logistic = LogisticRegression(sqlCtx)
+ >>> logistic = LogisticRegression(sparkSession)
>>> print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test))
MLPipeline way
@@ -263,9 +263,9 @@ class LogisticRegression(BaseSystemMLClassifier):
>>> from pyspark.ml import Pipeline
>>> from systemml.mllearn import LogisticRegression
>>> from pyspark.ml.feature import HashingTF, Tokenizer
- >>> from pyspark.sql import SQLContext
- >>> sqlCtx = SQLContext(sc)
- >>> training = sqlCtx.createDataFrame([
+ >>> from pyspark.sql import SparkSession
+ >>> sparkSession = SparkSession.builder.getOrCreate()
+ >>> training = sparkSession.createDataFrame([
>>> (0L, "a b c d e spark", 1.0),
>>> (1L, "b d", 2.0),
>>> (2L, "spark f g h", 1.0),
@@ -281,10 +281,10 @@ class LogisticRegression(BaseSystemMLClassifier):
>>> ], ["id", "text", "label"])
>>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>> hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20)
- >>> lr = LogisticRegression(sqlCtx)
+ >>> lr = LogisticRegression(sparkSession)
>>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
>>> model = pipeline.fit(training)
- >>> test = sqlCtx.createDataFrame([
+ >>> test = sparkSession.createDataFrame([
>>> (12L, "spark i j k"),
>>> (13L, "l m n"),
>>> (14L, "mapreduce spark"),
@@ -294,13 +294,13 @@ class LogisticRegression(BaseSystemMLClassifier):
"""
- def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
+ def __init__(self, sparkSession, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
"""
Performs both binomial and multinomial logistic regression.
Parameters
----------
- sqlCtx: PySpark SQLContext
+ sparkSession: PySpark SparkSession
penalty: Only 'l2' supported
fit_intercept: Specifies whether to add intercept or not (default: True)
max_iter: Maximum number of outer (Fisher scoring) iterations (default: 100)
@@ -309,8 +309,8 @@ class LogisticRegression(BaseSystemMLClassifier):
C: 1/regularization parameter (default: 1.0)
solver: Only 'newton-cg' solver supported
"""
- self.sqlCtx = sqlCtx
- self.sc = sqlCtx._sc
+ self.sparkSession = sparkSession
+ self.sc = sparkSession._sc
createJavaObject(self.sc, 'dummy')
self.uid = "logReg"
self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LogisticRegression(self.uid, self.sc._jsc.sc())
@@ -340,7 +340,7 @@ class LinearRegression(BaseSystemMLRegressor):
>>> import numpy as np
>>> from sklearn import datasets
>>> from systemml.mllearn import LinearRegression
- >>> from pyspark.sql import SQLContext
+ >>> from pyspark.sql import SparkSession
>>> # Load the diabetes dataset
>>> diabetes = datasets.load_diabetes()
>>> # Use only one feature
@@ -352,7 +352,7 @@ class LinearRegression(BaseSystemMLRegressor):
>>> diabetes_y_train = diabetes.target[:-20]
>>> diabetes_y_test = diabetes.target[-20:]
>>> # Create linear regression object
- >>> regr = LinearRegression(sqlCtx, solver='newton-cg')
+ >>> regr = LinearRegression(sparkSession, solver='newton-cg')
>>> # Train the model using the training sets
>>> regr.fit(diabetes_X_train, diabetes_y_train)
>>> # The mean square error
@@ -361,13 +361,13 @@ class LinearRegression(BaseSystemMLRegressor):
"""
- def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
+ def __init__(self, sparkSession, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
"""
Performs linear regression to model the relationship between one numerical response variable and one or more explanatory (feature) variables.
Parameters
----------
- sqlCtx: PySpark SQLContext
+ sparkSession: PySpark SparkSession
fit_intercept: Specifies whether to add intercept or not (default: True)
max_iter: Maximum number of conjugate gradient iterations, or 0 if no maximum limit provided (default: 100)
tol: Tolerance used in the convergence criterion (default: 0.000001)
@@ -377,8 +377,8 @@ class LinearRegression(BaseSystemMLRegressor):
'direct-solve' solver is more efficient when the number of features is relatively small (m < 1000) and
input matrix X is either tall or fairly dense; otherwise 'newton-cg' solver is more efficient.
"""
- self.sqlCtx = sqlCtx
- self.sc = sqlCtx._sc
+ self.sparkSession = sparkSession
+ self.sc = sparkSession._sc
createJavaObject(self.sc, 'dummy')
self.uid = "lr"
if solver == 'newton-cg' or solver == 'direct-solve':
@@ -405,8 +405,8 @@ class SVM(BaseSystemMLClassifier):
>>> from sklearn import datasets, neighbors
>>> from systemml.mllearn import SVM
- >>> from pyspark.sql import SQLContext
- >>> sqlCtx = SQLContext(sc)
+ >>> from pyspark.sql import SparkSession
+ >>> sparkSession = SparkSession.builder.getOrCreate()
>>> digits = datasets.load_digits()
>>> X_digits = digits.data
>>> y_digits = digits.target
@@ -415,27 +415,27 @@ class SVM(BaseSystemMLClassifier):
>>> y_train = y_digits[:.9 * n_samples]
>>> X_test = X_digits[.9 * n_samples:]
>>> y_test = y_digits[.9 * n_samples:]
- >>> svm = SVM(sqlCtx, is_multi_class=True)
+ >>> svm = SVM(sparkSession, is_multi_class=True)
>>> print('LogisticRegression score: %f' % svm.fit(X_train, y_train).score(X_test, y_test))
"""
- def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False):
+ def __init__(self, sparkSession, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False):
"""
Performs both binary-class and multiclass SVM (Support Vector Machines).
Parameters
----------
- sqlCtx: PySpark SQLContext
+ sparkSession: PySpark SparkSession
fit_intercept: Specifies whether to add intercept or not (default: True)
max_iter: Maximum number iterations (default: 100)
tol: Tolerance used in the convergence criterion (default: 0.000001)
C: 1/regularization parameter (default: 1.0)
is_multi_class: Specifies whether to use binary-class SVM or multi-class SVM algorithm (default: False)
"""
- self.sqlCtx = sqlCtx
- self.sc = sqlCtx._sc
+ self.sparkSession = sparkSession
+ self.sc = sparkSession._sc
self.uid = "svm"
createJavaObject(self.sc, 'dummy')
self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, self.sc._jsc.sc(), is_multi_class)
@@ -461,8 +461,8 @@ class NaiveBayes(BaseSystemMLClassifier):
>>> from sklearn.feature_extraction.text import TfidfVectorizer
>>> from systemml.mllearn import NaiveBayes
>>> from sklearn import metrics
- >>> from pyspark.sql import SQLContext
- >>> sqlCtx = SQLContext(sc)
+ >>> from pyspark.sql import SparkSession
+ >>> sparkSession = SparkSession.builder.getOrCreate(sc)
>>> categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 'sci.space']
>>> newsgroups_train = fetch_20newsgroups(subset='train', categories=categories)
>>> newsgroups_test = fetch_20newsgroups(subset='test', categories=categories)
@@ -470,24 +470,24 @@ class NaiveBayes(BaseSystemMLClassifier):
>>> # Both vectors and vectors_test are SciPy CSR matrix
>>> vectors = vectorizer.fit_transform(newsgroups_train.data)
>>> vectors_test = vectorizer.transform(newsgroups_test.data)
- >>> nb = NaiveBayes(sqlCtx)
+ >>> nb = NaiveBayes(sparkSession)
>>> nb.fit(vectors, newsgroups_train.target)
>>> pred = nb.predict(vectors_test)
>>> metrics.f1_score(newsgroups_test.target, pred, average='weighted')
"""
- def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False):
+ def __init__(self, sparkSession, laplace=1.0, transferUsingDF=False):
"""
Performs Naive Bayes.
Parameters
----------
- sqlCtx: PySpark SQLContext
+ sparkSession: PySpark SparkSession
laplace: Laplace smoothing specified by the user to avoid creation of 0 probabilities (default: 1.0)
"""
- self.sqlCtx = sqlCtx
- self.sc = sqlCtx._sc
+ self.sparkSession = sparkSession
+ self.sc = sparkSession._sc
self.uid = "nb"
createJavaObject(self.sc, 'dummy')
self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc())
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_df.py
----------------------------------------------------------------------
diff --git a/src/main/python/tests/test_mllearn_df.py b/src/main/python/tests/test_mllearn_df.py
index 0d6a4b4..da49953 100644
--- a/src/main/python/tests/test_mllearn_df.py
+++ b/src/main/python/tests/test_mllearn_df.py
@@ -36,7 +36,7 @@ import numpy as np
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import SQLContext
+from pyspark.sql import SparkSession
from sklearn import datasets, metrics, neighbors
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
@@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer
from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, SVM
sc = SparkContext()
-sqlCtx = SQLContext(sc)
+sparkSession = SparkSession.builder.getOrCreate()
# Currently not integrated with JUnit test
# ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py
@@ -60,7 +60,7 @@ class TestMLLearn(unittest.TestCase):
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
# Convert to DataFrame for i/o: current way to transfer data
- logistic = LogisticRegression(sqlCtx, transferUsingDF=True)
+ logistic = LogisticRegression(sparkSession, transferUsingDF=True)
score = logistic.fit(X_train, y_train).score(X_test, y_test)
self.failUnless(score > 0.9)
@@ -71,7 +71,7 @@ class TestMLLearn(unittest.TestCase):
diabetes_X_test = diabetes_X[-20:]
diabetes_y_train = diabetes.target[:-20]
diabetes_y_test = diabetes.target[-20:]
- regr = LinearRegression(sqlCtx, transferUsingDF=True)
+ regr = LinearRegression(sparkSession, transferUsingDF=True)
regr.fit(diabetes_X_train, diabetes_y_train)
score = regr.score(diabetes_X_test, diabetes_y_test)
self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly)
@@ -85,7 +85,7 @@ class TestMLLearn(unittest.TestCase):
y_train = y_digits[:int(.9 * n_samples)]
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
- svm = SVM(sqlCtx, is_multi_class=True, transferUsingDF=True)
+ svm = SVM(sparkSession, is_multi_class=True, transferUsingDF=True)
score = svm.fit(X_train, y_train).score(X_test, y_test)
self.failUnless(score > 0.9)
@@ -97,7 +97,7 @@ class TestMLLearn(unittest.TestCase):
# # Both vectors and vectors_test are SciPy CSR matrix
# vectors = vectorizer.fit_transform(newsgroups_train.data)
# vectors_test = vectorizer.transform(newsgroups_test.data)
- # nb = NaiveBayes(sqlCtx)
+ # nb = NaiveBayes(sparkSession)
# nb.fit(vectors, newsgroups_train.target)
# pred = nb.predict(vectors_test)
# score = metrics.f1_score(newsgroups_test.target, pred, average='weighted')
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_numpy.py
----------------------------------------------------------------------
diff --git a/src/main/python/tests/test_mllearn_numpy.py b/src/main/python/tests/test_mllearn_numpy.py
index d030837..925554f 100644
--- a/src/main/python/tests/test_mllearn_numpy.py
+++ b/src/main/python/tests/test_mllearn_numpy.py
@@ -36,7 +36,7 @@ import numpy as np
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import SQLContext
+from pyspark.sql import SparkSession
from sklearn import datasets, metrics, neighbors
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
@@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer
from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, SVM
sc = SparkContext()
-sqlCtx = SQLContext(sc)
+sparkSession = SparkSession.builder.getOrCreate()
# Currently not integrated with JUnit test
# ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py
@@ -58,12 +58,12 @@ class TestMLLearn(unittest.TestCase):
y_train = y_digits[:int(.9 * n_samples)]
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
- logistic = LogisticRegression(sqlCtx)
+ logistic = LogisticRegression(sparkSession)
score = logistic.fit(X_train, y_train).score(X_test, y_test)
self.failUnless(score > 0.9)
def test_logistic_mlpipeline(self):
- training = sqlCtx.createDataFrame([
+ training = sparkSession.createDataFrame([
("a b c d e spark", 1.0),
("b d", 2.0),
("spark f g h", 1.0),
@@ -79,10 +79,10 @@ class TestMLLearn(unittest.TestCase):
], ["text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20)
- lr = LogisticRegression(sqlCtx)
+ lr = LogisticRegression(sparkSession)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
- test = sqlCtx.createDataFrame([
+ test = sparkSession.createDataFrame([
("spark i j k", 1.0),
("l m n", 2.0),
("mapreduce spark", 1.0),
@@ -101,7 +101,7 @@ class TestMLLearn(unittest.TestCase):
diabetes_X_test = diabetes_X[-20:]
diabetes_y_train = diabetes.target[:-20]
diabetes_y_test = diabetes.target[-20:]
- regr = LinearRegression(sqlCtx)
+ regr = LinearRegression(sparkSession)
regr.fit(diabetes_X_train, diabetes_y_train)
score = regr.score(diabetes_X_test, diabetes_y_test)
self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly)
@@ -115,7 +115,7 @@ class TestMLLearn(unittest.TestCase):
y_train = y_digits[:int(.9 * n_samples)]
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
- svm = SVM(sqlCtx, is_multi_class=True)
+ svm = SVM(sparkSession, is_multi_class=True)
score = svm.fit(X_train, y_train).score(X_test, y_test)
self.failUnless(score > 0.9)
@@ -128,7 +128,7 @@ class TestMLLearn(unittest.TestCase):
y_train = y_digits[:int(.9 * n_samples)]
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
- nb = NaiveBayes(sqlCtx)
+ nb = NaiveBayes(sparkSession)
score = nb.fit(X_train, y_train).score(X_test, y_test)
self.failUnless(score > 0.8)
@@ -140,7 +140,7 @@ class TestMLLearn(unittest.TestCase):
# # Both vectors and vectors_test are SciPy CSR matrix
# vectors = vectorizer.fit_transform(newsgroups_train.data)
# vectors_test = vectorizer.transform(newsgroups_test.data)
- # nb = NaiveBayes(sqlCtx)
+ # nb = NaiveBayes(sparkSession)
# nb.fit(vectors, newsgroups_train.target)
# pred = nb.predict(vectors_test)
# score = metrics.f1_score(newsgroups_test.target, pred, average='weighted')
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index 1145463..fb9697d 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -152,11 +152,11 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
if(outputProb) {
val prob = modelPredict.getDataFrame(probVar, true).withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability")
- val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+ val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF))
}
else {
- val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+ val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
return PredictionUtils.joinUsingID(dataset, predictedDF)
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
index c47fb3c..08154bb 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
@@ -80,7 +80,7 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel {
val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
val predictedDF = modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction")
- val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+ val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
return PredictionUtils.joinUsingID(dataset, predictedDF)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
index c0e3f35..ce89502 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
@@ -111,16 +111,16 @@ class LogisticRegressionModel(override val uid: String)(
*/
object LogisticRegressionExample {
import org.apache.spark.{ SparkConf, SparkContext }
+ import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
def main(args: Array[String]) = {
- val sparkConf: SparkConf = new SparkConf();
- val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf);
- val sqlContext = new org.apache.spark.sql.SQLContext(sc);
+ val sparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate();
+ val sc: SparkContext = sparkSession.sparkContext;
- import sqlContext.implicits._
+ import sparkSession.implicits._
val training = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)),
@@ -130,7 +130,7 @@ object LogisticRegressionExample {
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3))))
val lr = new LogisticRegression("log", sc)
val lrmodel = lr.fit(training.toDF)
- // lrmodel.mloutput.getDF(sqlContext, "B_out").show()
+ // lrmodel.mloutput.getDF(sparkSession, "B_out").show()
val testing = sc.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
index 5b25c71..e5ed921 100644
--- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
@@ -719,13 +719,11 @@ public abstract class AutomatedTestBase
TestUtils.writeTestScalar(baseDirectory + EXPECTED_DIR + cacheDir + name, value);
expectedFiles.add(baseDirectory + EXPECTED_DIR + cacheDir + name);
}
-
- @SuppressWarnings("deprecation")
+
protected static HashMap<CellIndex, Double> readDMLMatrixFromHDFS(String fileName) {
return TestUtils.readDMLMatrixFromHDFS(baseDirectory + OUTPUT_DIR + fileName);
}
- @SuppressWarnings("deprecation")
public HashMap<CellIndex, Double> readRMatrixFromFS(String fileName) {
System.out.println("R script out: " + baseDirectory + EXPECTED_DIR + cacheDir + fileName);
return TestUtils.readRMatrixFromFS(baseDirectory + EXPECTED_DIR + cacheDir + fileName);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
index 7a69423..8ca1d8d 100644
--- a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
@@ -32,7 +32,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -85,12 +85,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase {
list.add("[1.2, 3.4]");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema);
- Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+ Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
+ Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
List<String> expectedResults = new ArrayList<String>();
expectedResults.add("[[1.2,4.3,3.4]]");
@@ -111,12 +111,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase {
list.add(null);
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema);
- Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+ Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
+ Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
List<String> expectedResults = new ArrayList<String>();
expectedResults.add("[[1.2,3.4]]");
@@ -134,12 +134,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase {
list.add("[cheeseburger,fries]");
JavaRDD<String> javaRddString = sc.parallelize(list);
JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
- Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema);
- Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+ Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema);
+ Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
// trigger evaluation to throw exception
outDF.collectAsList();
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index d5edf01..f626813 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -32,7 +32,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
@@ -532,10 +532,10 @@ public class FrameConverterTest extends AutomatedTestBase
OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
//Create DataFrame
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, lschema);
- Dataset<Row> df = sqlContext.createDataFrame(rowRDD, dfSchema);
+ Dataset<Row> df = sparkSession.createDataFrame(rowRDD, dfSchema);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
.dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/)
@@ -549,7 +549,8 @@ public class FrameConverterTest extends AutomatedTestBase
JavaPairRDD<Long, FrameBlock> rddIn = sc
.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
.mapToPair(new LongWritableFrameToLongFrameFunction());
- Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, lschema);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+ Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, rddIn, mc, lschema);
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index d3fdc2a..199100e 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
+import org.apache.spark.sql.SparkSession;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.conf.ConfigurationManager;
@@ -38,6 +37,7 @@ import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
public class DataFrameMatrixConversionTest extends AutomatedTestBase
@@ -191,13 +191,13 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
//setup spark context
sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
JavaSparkContext sc = sec.getSparkContext();
- SQLContext sqlctx = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
//get binary block input rdd
JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
//matrix - dataframe - matrix conversion
- Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+ Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, vector);
df = ( rows==rows3 ) ? df.repartition(rows) : df;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
index 0e826a3..09628e5 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
@@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
+import org.apache.spark.sql.SparkSession;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.conf.ConfigurationManager;
@@ -40,6 +39,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
public class DataFrameRowFrameConversionTest extends AutomatedTestBase
@@ -216,15 +216,16 @@ public class DataFrameRowFrameConversionTest extends AutomatedTestBase
//setup spark context
sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
JavaSparkContext sc = sec.getSparkContext();
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+
sc.getConf().set("spark.memory.offHeap.enabled", "false");
- SQLContext sqlctx = new SQLContext(sc);
- sqlctx.setConf("spark.sql.codegen.wholeStage", "false");
-
+ sparkSession.conf().set("spark.sql.codegen.wholeStage", "false");
+
//get binary block input rdd
JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
//frame - dataframe - frame conversion
- Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+ Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, schema);
JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
//get output frame block
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
index c6d2251..4a73376 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
@@ -30,12 +30,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.junit.Test;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.conf.ConfigurationManager;
@@ -53,6 +52,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
@@ -268,10 +268,10 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
//setup spark context
sec = (SparkExecutionContext) ExecutionContextFactory.createContext();
JavaSparkContext sc = sec.getSparkContext();
- SQLContext sqlctx = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
//create input data frame
- Dataset<Row> df = createDataFrame(sqlctx, mbA, containsID, schema);
+ Dataset<Row> df = createDataFrame(sparkSession, mbA, containsID, schema);
//dataframe - frame conversion
JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID);
@@ -294,17 +294,9 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
DMLScript.rtplatform = oldPlatform;
}
}
-
- /**
- *
- * @param sqlctx
- * @param mb
- * @param schema
- * @return
- * @throws DMLRuntimeException
- */
+
@SuppressWarnings("resource")
- private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema)
+ private Dataset<Row> createDataFrame(SparkSession sparkSession, MatrixBlock mb, boolean containsID, ValueType[] schema)
throws DMLRuntimeException
{
//create in-memory list of rows
@@ -350,8 +342,8 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
StructType dfSchema = DataTypes.createStructType(fields);
//create rdd and data frame
- JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext());
+ JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sc.parallelize(list);
- return sqlctx.createDataFrame(rowRDD, dfSchema);
+ return sparkSession.createDataFrame(rowRDD, dfSchema);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
index 14ed4b7..92677b8 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
@@ -32,12 +32,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.junit.Test;
import org.apache.sysml.api.mlcontext.FrameFormat;
import org.apache.sysml.api.mlcontext.FrameMetadata;
import org.apache.sysml.api.mlcontext.MLContext;
@@ -55,6 +54,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
public class DataFrameVectorScriptTest extends AutomatedTestBase
@@ -269,10 +269,10 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase
SparkConf conf = SparkExecutionContext.createSystemMLSparkConf()
.setAppName("MLContextFrameTest").setMaster("local");
sc = new JavaSparkContext(conf);
- SQLContext sqlctx = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
//create input data frame
- Dataset<Row> df = createDataFrame(sqlctx, mbA, containsID, schema);
+ Dataset<Row> df = createDataFrame(sparkSession, mbA, containsID, schema);
// Create full frame metadata, and empty frame metadata
FrameMetadata meta = new FrameMetadata(containsID ? FrameFormat.DF_WITH_INDEX :
@@ -315,17 +315,9 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase
ml.close();
}
}
-
- /**
- *
- * @param sqlctx
- * @param mb
- * @param schema
- * @return
- * @throws DMLRuntimeException
- */
+
@SuppressWarnings("resource")
- private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema)
+ private Dataset<Row> createDataFrame(SparkSession sparkSession, MatrixBlock mb, boolean containsID, ValueType[] schema)
throws DMLRuntimeException
{
//create in-memory list of rows
@@ -371,8 +363,8 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase
StructType dfSchema = DataTypes.createStructType(fields);
//create rdd and data frame
- JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext());
+ JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sc.parallelize(list);
- return sqlctx.createDataFrame(rowRDD, dfSchema);
+ return sparkSession.createDataFrame(rowRDD, dfSchema);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index e6a947f..d485c48 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.sysml.api.DMLException;
import org.apache.sysml.api.DMLScript;
@@ -68,7 +68,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
-@SuppressWarnings("deprecation")
public class FrameTest extends AutomatedTestBase
{
private final static String TEST_DIR = "functions/frame/";
@@ -238,15 +237,16 @@ public class FrameTest extends AutomatedTestBase
if(bFromDataFrame)
{
//Create DataFrame for input A
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
+
JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(sc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema);
- dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
+ dfA = sparkSession.createDataFrame(rowRDDA, dfSchemaA);
//Create DataFrame for input B
StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(sc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
- dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
+ dfB = sparkSession.createDataFrame(rowRDDB, dfSchemaB);
}
try
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
index 0b3fac4..6dd74d3 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
@@ -34,7 +34,7 @@ import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -53,7 +53,6 @@ import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToDoubleArrayRow;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -239,11 +238,11 @@ public class MLContextFrameTest extends AutomatedTestBase {
JavaRDD<Row> javaRddRowB = FrameRDDConverterUtils.csvToRowRDD(sc, javaRDDB, CSV_DELIM, schemaB);
// Create DataFrame
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaA, false);
- Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, dfSchemaA);
+ Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, dfSchemaA);
StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
- Dataset<Row> dataFrameB = sqlContext.createDataFrame(javaRddRowB, dfSchemaB);
+ Dataset<Row> dataFrameB = sparkSession.createDataFrame(javaRddRowB, dfSchemaB);
if (script_type == SCRIPT_TYPE.DML)
script = dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, fmB).out("A")
.out("C");
@@ -493,18 +492,18 @@ public class MLContextFrameTest extends AutomatedTestBase {
JavaRDD<Row> javaRddRowA = FrameRDDConverterUtils.csvToRowRDD(sc, javaRddStringA, CSV_DELIM, schema);
JavaRDD<Row> javaRddRowB = javaRddStringB.map(new CommaSeparatedValueStringToDoubleArrayRow());
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fieldsA = new ArrayList<StructField>();
fieldsA.add(DataTypes.createStructField("1", DataTypes.StringType, true));
fieldsA.add(DataTypes.createStructField("2", DataTypes.DoubleType, true));
StructType schemaA = DataTypes.createStructType(fieldsA);
- Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA);
+ Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA);
List<StructField> fieldsB = new ArrayList<StructField>();
fieldsB.add(DataTypes.createStructField("1", DataTypes.DoubleType, true));
StructType schemaB = DataTypes.createStructType(fieldsB);
- Dataset<Row> dataFrameB = sqlContext.createDataFrame(javaRddRowB, schemaB);
+ Dataset<Row> dataFrameB = sparkSession.createDataFrame(javaRddRowB, schemaB);
String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: true ,recode: [ 1, 2 ]}\");\n"
+ "C = tA %*% B;\n" + "M = s * C;";
@@ -530,14 +529,14 @@ public class MLContextFrameTest extends AutomatedTestBase {
JavaRDD<Row> javaRddRowA = sc. parallelize( Arrays.asList(rowsA));
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fieldsA = new ArrayList<StructField>();
fieldsA.add(DataTypes.createStructField("myID", DataTypes.StringType, true));
fieldsA.add(DataTypes.createStructField("FeatureName", DataTypes.StringType, true));
fieldsA.add(DataTypes.createStructField("FeatureValue", DataTypes.IntegerType, true));
StructType schemaA = DataTypes.createStructType(fieldsA);
- Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA);
+ Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA);
String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: false ,recode: [ myID, FeatureName ]}\");";
@@ -572,14 +571,14 @@ public class MLContextFrameTest extends AutomatedTestBase {
JavaRDD<Row> javaRddRowA = sc. parallelize( Arrays.asList(rowsA));
- SQLContext sqlContext = new SQLContext(sc);
+ SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
List<StructField> fieldsA = new ArrayList<StructField>();
fieldsA.add(DataTypes.createStructField("featureName", DataTypes.StringType, true));
fieldsA.add(DataTypes.createStructField("featureValue", DataTypes.IntegerType, true));
fieldsA.add(DataTypes.createStructField("id", DataTypes.StringType, true));
StructType schemaA = DataTypes.createStructType(fieldsA);
- Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA);
+ Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA);
String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: false ,recode: [ featureName, id ]}\");";
@@ -622,7 +621,7 @@ public class MLContextFrameTest extends AutomatedTestBase {
// JavaRDD<Row> javaRddRowA = javaRddStringA.map(new
// CommaSeparatedValueStringToRow());
//
- // SQLContext sqlContext = new SQLContext(sc);
+ // SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
//
// List<StructField> fieldsA = new ArrayList<StructField>();
// fieldsA.add(DataTypes.createStructField("1", DataTypes.StringType,
@@ -630,7 +629,7 @@ public class MLContextFrameTest extends AutomatedTestBase {
// fieldsA.add(DataTypes.createStructField("2", DataTypes.StringType,
// true));
// StructType schemaA = DataTypes.createStructType(fieldsA);
- // DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA);
+ // DataFrame dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA);
//
// String dmlString = "[tA, tAM] = transformencode (target = A, spec =
// \"{ids: true ,recode: [ 1, 2 ]}\");\n";