You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/12/08 08:37:27 UTC
spark git commit: [SPARK-12160][MLLIB] Use SQLContext.getOrCreate in
MLlib - 1.5 backport
Repository: spark
Updated Branches:
refs/heads/branch-1.5 3868ab644 -> 2f30927a5
[SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib - 1.5 backport
This backports [https://github.com/apache/spark/pull/10161] to Spark 1.5, with the difference that ChiSqSelector does not require modification.
Switched from using SQLContext constructor to using getOrCreate, mainly in model save/load methods.
This covers all instances in spark.mllib. There were no uses of the constructor in spark.ml.
CC: yhuai mengxr
Author: Joseph K. Bradley <jo...@databricks.com>
Closes #10183 from jkbradley/sqlcontext-backport1.5.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f30927a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f30927a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f30927a
Branch: refs/heads/branch-1.5
Commit: 2f30927a5f40f2862e777bfe97282ddcfc0a063a
Parents: 3868ab6
Author: Joseph K. Bradley <jo...@databricks.com>
Authored: Mon Dec 7 23:37:23 2015 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Dec 7 23:37:23 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 6 +++---
.../org/apache/spark/mllib/classification/NaiveBayes.scala | 8 ++++----
.../mllib/classification/impl/GLMClassificationModel.scala | 4 ++--
.../apache/spark/mllib/clustering/GaussianMixtureModel.scala | 4 ++--
.../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++--
.../spark/mllib/clustering/PowerIterationClustering.scala | 4 ++--
.../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 4 ++--
.../mllib/recommendation/MatrixFactorizationModel.scala | 4 ++--
.../apache/spark/mllib/regression/IsotonicRegression.scala | 4 ++--
.../spark/mllib/regression/impl/GLMRegressionModel.scala | 4 ++--
.../apache/spark/mllib/tree/model/DecisionTreeModel.scala | 4 ++--
.../apache/spark/mllib/tree/model/treeEnsembleModels.scala | 4 ++--
12 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f585aac..06e13b7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1149,7 +1149,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
// We use DataFrames for serialization of IndexedRows to Python,
// so return a DataFrame.
- val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext)
+ val sqlContext = SQLContext.getOrCreate(indexedRowMatrix.rows.sparkContext)
sqlContext.createDataFrame(indexedRowMatrix.rows)
}
@@ -1159,7 +1159,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = {
// We use DataFrames for serialization of MatrixEntry entries to
// Python, so return a DataFrame.
- val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
+ val sqlContext = SQLContext.getOrCreate(coordinateMatrix.entries.sparkContext)
sqlContext.createDataFrame(coordinateMatrix.entries)
}
@@ -1169,7 +1169,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = {
// We use DataFrames for serialization of sub-matrix blocks to
// Python, so return a DataFrame.
- val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext)
+ val sqlContext = SQLContext.getOrCreate(blockMatrix.blocks.sparkContext)
sqlContext.createDataFrame(blockMatrix.blocks)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index a956084..aef9ef2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -192,7 +192,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
modelType: String)
def save(sc: SparkContext, path: String, data: Data): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Create JSON metadata.
@@ -208,7 +208,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
@Since("1.3.0")
def load(sc: SparkContext, path: String): NaiveBayesModel = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
// Load Parquet data.
val dataRDD = sqlContext.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
@@ -239,7 +239,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
theta: Array[Array[Double]])
def save(sc: SparkContext, path: String, data: Data): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Create JSON metadata.
@@ -254,7 +254,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
}
def load(sc: SparkContext, path: String): NaiveBayesModel = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
// Load Parquet data.
val dataRDD = sqlContext.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index fe09f6b..2910c02 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -51,7 +51,7 @@ private[classification] object GLMClassificationModel {
weights: Vector,
intercept: Double,
threshold: Option[Double]): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Create JSON metadata.
@@ -74,7 +74,7 @@ private[classification] object GLMClassificationModel {
*/
def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
val datapath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val dataRDD = sqlContext.read.parquet(datapath)
val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1)
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index 7f6163e..7bef086 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -149,7 +149,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
weights: Array[Double],
gaussians: Array[MultivariateGaussian]): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Create JSON metadata.
@@ -166,7 +166,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
val dataPath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val dataFrame = sqlContext.read.parquet(dataPath)
val dataArray = dataFrame.select("weight", "mu", "sigma").collect()
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 45021f4..a40148d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -124,7 +124,7 @@ object KMeansModel extends Loader[KMeansModel] {
val thisClassName = "org.apache.spark.mllib.clustering.KMeansModel"
def save(sc: SparkContext, model: KMeansModel, path: String): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
@@ -137,7 +137,7 @@ object KMeansModel extends Loader[KMeansModel] {
def load(sc: SparkContext, path: String): KMeansModel = {
implicit val formats = DefaultFormats
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 6c76e26..a6657ed 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
@Since("1.4.0")
def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val metadata = compact(render(
@@ -87,7 +87,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
@Since("1.4.0")
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
implicit val formats = DefaultFormats
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index c226e3c..131a862 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -588,7 +588,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
def load(sc: SparkContext, path: String): Word2VecModel = {
val dataPath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val dataFrame = sqlContext.read.parquet(dataPath)
val dataArray = dataFrame.select("word", "vector").collect()
@@ -602,7 +602,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val vectorSize = model.values.head.size
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 46562eb..0dc4048 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -353,7 +353,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
*/
def save(model: MatrixFactorizationModel, path: String): Unit = {
val sc = model.userFeatures.sparkContext
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
@@ -364,7 +364,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
implicit val formats = DefaultFormats
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val (className, formatVersion, metadata) = loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 877d31b..9b8c860 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
boundaries: Array[Double],
predictions: Array[Double],
isotonic: Boolean): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
@@ -201,7 +201,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
}
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val dataRDD = sqlContext.read.parquet(dataPath(path))
checkSchema[Data](dataRDD.schema)
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index 317d3a5..02af281 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -47,7 +47,7 @@ private[regression] object GLMRegressionModel {
modelClass: String,
weights: Vector,
intercept: Double): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// Create JSON metadata.
@@ -71,7 +71,7 @@ private[regression] object GLMRegressionModel {
*/
def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = {
val datapath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val dataRDD = sqlContext.read.parquet(datapath)
val dataArray = dataRDD.select("weights", "intercept").take(1)
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index e1bf23f..1faef1c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -203,7 +203,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
}
def save(sc: SparkContext, path: String, model: DecisionTreeModel): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// SPARK-6120: We do a hacky check here so users understand why save() is failing
@@ -244,7 +244,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = {
val datapath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
// Load Parquet data.
val dataRDD = sqlContext.read.parquet(datapath)
// Check schema explicitly since erasure makes it hard to use match-case for checking.
http://git-wip-us.apache.org/repos/asf/spark/blob/2f30927a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index df5b8fe..3cbe634 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -413,7 +413,7 @@ private[tree] object TreeEnsembleModel extends Logging {
case class EnsembleNodeData(treeId: Int, node: NodeData)
def save(sc: SparkContext, path: String, model: TreeEnsembleModel, className: String): Unit = {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// SPARK-6120: We do a hacky check here so users understand why save() is failing
@@ -473,7 +473,7 @@ private[tree] object TreeEnsembleModel extends Logging {
path: String,
treeAlgo: String): Array[DecisionTreeModel] = {
val datapath = Loader.dataPath(path)
- val sqlContext = new SQLContext(sc)
+ val sqlContext = SQLContext.getOrCreate(sc)
val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
val trees = constructTrees(nodes)
trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org