You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/01/27 05:02:02 UTC
spark git commit: [SPARK-18821][SPARKR] Bisecting k-means wrapper in
SparkR
Repository: spark
Updated Branches:
refs/heads/master 1191fe267 -> c0ba28430
[SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR
## What changes were proposed in this pull request?
Add R wrapper for bisecting Kmeans.
As JIRA is down, I will update title to link with corresponding JIRA later.
## How was this patch tested?
Add new unit tests.
Author: wm624@hotmail.com <wm...@hotmail.com>
Closes #16566 from wangmiao1981/bk.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ba2843
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ba2843
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ba2843
Branch: refs/heads/master
Commit: c0ba284300e494354f5bb205a10a12ac7daa2b5e
Parents: 1191fe2
Author: wm624@hotmail.com <wm...@hotmail.com>
Authored: Thu Jan 26 21:01:59 2017 -0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Thu Jan 26 21:01:59 2017 -0800
----------------------------------------------------------------------
R/pkg/NAMESPACE | 3 +-
R/pkg/R/generics.R | 5 +
R/pkg/R/mllib_clustering.R | 149 +++++++++++++++++++
R/pkg/R/mllib_utils.R | 10 +-
.../inst/tests/testthat/test_mllib_clustering.R | 40 +++++
.../spark/ml/r/BisectingKMeansWrapper.scala | 143 ++++++++++++++++++
.../scala/org/apache/spark/ml/r/RWrappers.scala | 2 +
7 files changed, 347 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 0cd9cb8..caa1c3b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -47,7 +47,8 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
- "spark.gbt")
+ "spark.gbt",
+ "spark.bisectingKmeans")
# Job group lifecycle management methods
export("setJobGroup",
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 499c7b2..433c166 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1338,6 +1338,11 @@ setGeneric("rbind", signature = "...")
#' @export
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })
+#' @rdname spark.bisectingKmeans
+#' @export
+setGeneric("spark.bisectingKmeans",
+ function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })
+
#' @rdname spark.gaussianMixture
#' @export
setGeneric("spark.gaussianMixture",
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R
index fa40f9d..05bbab6 100644
--- a/R/pkg/R/mllib_clustering.R
+++ b/R/pkg/R/mllib_clustering.R
@@ -17,6 +17,13 @@
# mllib_clustering.R: Provides methods for MLlib clustering algorithms integration
+#' S4 class that represents a BisectingKMeansModel
+#'
+#' @param jobj a Java object reference to the backing Scala BisectingKMeansModel
+#' @export
+#' @note BisectingKMeansModel since 2.2.0
+setClass("BisectingKMeansModel", representation(jobj = "jobj"))
+
#' S4 class that represents a GaussianMixtureModel
#'
#' @param jobj a Java object reference to the backing Scala GaussianMixtureModel
@@ -38,6 +45,148 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @note LDAModel since 2.1.0
setClass("LDAModel", representation(jobj = "jobj"))
+#' Bisecting K-Means Clustering Model
+#'
+#' Fits a bisecting k-means clustering model against a Spark DataFrame.
+#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
+#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
+#'
+#' @param data a SparkDataFrame for training.
+#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
+#' operators are supported, including '~', '.', ':', '+', and '-'.
+#' Note that the response variable of formula is empty in spark.bisectingKmeans.
+#' @param k the desired number of leaf clusters. Must be > 1.
+#' The actual number could be smaller if there are no divisible leaf clusters.
+#' @param maxIter maximum iteration number.
+#' @param seed the random seed.
+#' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0)
+#' or the minimum proportion of points (if less than 1.0) of a divisible cluster.
+#' Note that it is an expert parameter. The default value should be good enough
+#' for most cases.
+#' @param ... additional argument(s) passed to the method.
+#' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model.
+#' @rdname spark.bisectingKmeans
+#' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method
+#' @name spark.bisectingKmeans
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- createDataFrame(iris)
+#' model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
+#' summary(model)
+#'
+#' # get fitted result from a bisecting k-means model
+#' fitted.model <- fitted(model, "centers")
+#' showDF(fitted.model)
+#'
+#' # fitted values on training data
+#' fitted <- predict(model, df)
+#' head(select(fitted, "Sepal_Length", "prediction"))
+#'
+#' # save fitted model to input path
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#'
+#' # can also read back the saved model and print
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.bisectingKmeans since 2.2.0
+#' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
+setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"),
+ function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) {
+ formula <- paste0(deparse(formula), collapse = "")
+ if (!is.null(seed)) {
+ seed <- as.character(as.integer(seed))
+ }
+ jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit",
+ data@sdf, formula, as.integer(k), as.integer(maxIter),
+ seed, as.numeric(minDivisibleClusterSize))
+ new("BisectingKMeansModel", jobj = jobj)
+ })
+
+# Get the summary of a bisecting k-means model
+
+#' @param object a fitted bisecting k-means model.
+#' @return \code{summary} returns summary information of the fitted model, which is a list.
+#' The list includes the model's \code{k} (number of cluster centers),
+#' \code{coefficients} (model cluster centers),
+#' \code{size} (number of data points in each cluster), \code{cluster}
+#' (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE),
+#' and \code{is.loaded} (whether the model is loaded from a saved file).
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note summary(BisectingKMeansModel) since 2.2.0
+setMethod("summary", signature(object = "BisectingKMeansModel"),
+ function(object) {
+ jobj <- object@jobj
+ is.loaded <- callJMethod(jobj, "isLoaded")
+ features <- callJMethod(jobj, "features")
+ coefficients <- callJMethod(jobj, "coefficients")
+ k <- callJMethod(jobj, "k")
+ size <- callJMethod(jobj, "size")
+ coefficients <- t(matrix(coefficients, ncol = k))
+ colnames(coefficients) <- unlist(features)
+ rownames(coefficients) <- 1:k
+ cluster <- if (is.loaded) {
+ NULL
+ } else {
+ dataFrame(callJMethod(jobj, "cluster"))
+ }
+ list(k = k, coefficients = coefficients, size = size,
+ cluster = cluster, is.loaded = is.loaded)
+ })
+
+# Predicted values based on a bisecting k-means model
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns the predicted values based on a bisecting k-means model.
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note predict(BisectingKMeansModel) since 2.2.0
+setMethod("predict", signature(object = "BisectingKMeansModel"),
+ function(object, newData) {
+ predict_internal(object, newData)
+ })
+
+#' Get fitted result from a bisecting k-means model
+#'
+#' Get fitted result from a bisecting k-means model.
+#' Note: A saved-loaded model does not support this method.
+#'
+#' @param method type of fitted results, \code{"centers"} for cluster centers
+#' or \code{"classes"} for assigned classes.
+#' @return \code{fitted} returns a SparkDataFrame containing fitted values.
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note fitted since 2.2.0
+setMethod("fitted", signature(object = "BisectingKMeansModel"),
+ function(object, method = c("centers", "classes")) {
+ method <- match.arg(method)
+ jobj <- object@jobj
+ is.loaded <- callJMethod(jobj, "isLoaded")
+ if (is.loaded) {
+ stop("Saved-loaded bisecting k-means model does not support 'fitted' method")
+ } else {
+ dataFrame(callJMethod(jobj, "fitted", method))
+ }
+ })
+
+# Save fitted MLlib model to the input path
+
+#' @param path the directory where the model is saved.
+#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
+#' which means throw exception if the output path exists.
+#'
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note write.ml(BisectingKMeansModel, character) since 2.2.0
+setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"),
+ function(object, path, overwrite = FALSE) {
+ write_internal(object, path, overwrite)
+ })
+
#' Multivariate Gaussian Mixture Model (GMM)
#'
#' Fits multivariate gaussian mixture model against a Spark DataFrame, similarly to R's
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/mllib_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
index 720ee41..29c4473 100644
--- a/R/pkg/R/mllib_utils.R
+++ b/R/pkg/R/mllib_utils.R
@@ -32,8 +32,8 @@
#' @rdname write.ml
#' @name write.ml
#' @export
-#' @seealso \link{spark.glm}, \link{glm},
-#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
+#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg},
@@ -47,8 +47,8 @@ NULL
#' @rdname predict
#' @name predict
#' @export
-#' @seealso \link{spark.glm}, \link{glm},
-#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
+#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg}
@@ -113,6 +113,8 @@ read.ml <- function(path) {
new("GBTRegressionModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) {
new("GBTClassificationModel", jobj = jobj)
+ } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
+ new("BisectingKMeansModel", jobj = jobj)
} else {
stop("Unsupported model: ", jobj)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/inst/tests/testthat/test_mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib_clustering.R b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
index 9de8362..aad834b 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_clustering.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
@@ -27,6 +27,46 @@ absoluteSparkPath <- function(x) {
file.path(sparkHome, x)
}
+test_that("spark.bisectingKmeans", {
+ newIris <- iris
+ newIris$Species <- NULL
+ training <- suppressWarnings(createDataFrame(newIris))
+
+ take(training, 1)
+
+ model <- spark.bisectingKmeans(data = training, ~ .)
+ sample <- take(select(predict(model, training), "prediction"), 1)
+ expect_equal(typeof(sample$prediction), "integer")
+ expect_equal(sample$prediction, 1)
+
+ # Test fitted works on Bisecting KMeans
+ fitted.model <- fitted(model)
+ expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
+ c(0, 1, 2, 3))
+
+ # Test summary works on KMeans
+ summary.model <- summary(model)
+ cluster <- summary.model$cluster
+ k <- summary.model$k
+ expect_equal(k, 4)
+ expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
+ c(0, 1, 2, 3))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ summary2 <- summary(model2)
+ expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+ expect_equal(summary.model$coefficients, summary2$coefficients)
+ expect_true(!summary.model$is.loaded)
+ expect_true(summary2$is.loaded)
+
+ unlink(modelPath)
+})
+
test_that("spark.gaussianMixture", {
# R code to reproduce the result.
# nolint start
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
new file mode 100644
index 0000000..71712c1
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.clustering.{BisectingKMeans, BisectingKMeansModel}
+import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class BisectingKMeansWrapper private (
+ val pipeline: PipelineModel,
+ val features: Array[String],
+ val size: Array[Long],
+ val isLoaded: Boolean = false) extends MLWritable {
+ private val bisectingKmeansModel: BisectingKMeansModel =
+ pipeline.stages.last.asInstanceOf[BisectingKMeansModel]
+
+ lazy val coefficients: Array[Double] = bisectingKmeansModel.clusterCenters.flatMap(_.toArray)
+
+ lazy val k: Int = bisectingKmeansModel.getK
+
+ // If the model is loaded from a saved model, cluster is NULL. It is checked on R side
+ lazy val cluster: DataFrame = bisectingKmeansModel.summary.cluster
+
+ def fitted(method: String): DataFrame = {
+ if (method == "centers") {
+ bisectingKmeansModel.summary.predictions.drop(bisectingKmeansModel.getFeaturesCol)
+ } else if (method == "classes") {
+ bisectingKmeansModel.summary.cluster
+ } else {
+ throw new UnsupportedOperationException(
+ s"Method (centers or classes) required but $method found.")
+ }
+ }
+
+ def transform(dataset: Dataset[_]): DataFrame = {
+ pipeline.transform(dataset).drop(bisectingKmeansModel.getFeaturesCol)
+ }
+
+ override def write: MLWriter = new BisectingKMeansWrapper.BisectingKMeansWrapperWriter(this)
+}
+
+private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapper] {
+
+ def fit(
+ data: DataFrame,
+ formula: String,
+ k: Int,
+ maxIter: Int,
+ seed: String,
+ minDivisibleClusterSize: Double
+ ): BisectingKMeansWrapper = {
+
+ val rFormula = new RFormula()
+ .setFormula(formula)
+ .setFeaturesCol("features")
+ RWrapperUtils.checkDataColumns(rFormula, data)
+ val rFormulaModel = rFormula.fit(data)
+
+ // get feature names from output schema
+ val schema = rFormulaModel.transform(data).schema
+ val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
+ .attributes.get
+ val features = featureAttrs.map(_.name.get)
+
+ val bisectingKmeans = new BisectingKMeans()
+ .setK(k)
+ .setMaxIter(maxIter)
+ .setMinDivisibleClusterSize(minDivisibleClusterSize)
+ .setFeaturesCol(rFormula.getFeaturesCol)
+
+ if (seed != null && seed.length > 0) bisectingKmeans.setSeed(seed.toInt)
+
+ val pipeline = new Pipeline()
+ .setStages(Array(rFormulaModel, bisectingKmeans))
+ .fit(data)
+
+ val bisectingKmeansModel: BisectingKMeansModel =
+ pipeline.stages.last.asInstanceOf[BisectingKMeansModel]
+ val size: Array[Long] = bisectingKmeansModel.summary.clusterSizes
+
+ new BisectingKMeansWrapper(pipeline, features, size)
+ }
+
+ override def read: MLReader[BisectingKMeansWrapper] = new BisectingKMeansWrapperReader
+
+ override def load(path: String): BisectingKMeansWrapper = super.load(path)
+
+ class BisectingKMeansWrapperWriter(instance: BisectingKMeansWrapper) extends MLWriter {
+
+ override protected def saveImpl(path: String): Unit = {
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadata = ("class" -> instance.getClass.getName) ~
+ ("features" -> instance.features.toSeq) ~
+ ("size" -> instance.size.toSeq)
+ val rMetadataJson: String = compact(render(rMetadata))
+
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ instance.pipeline.save(pipelinePath)
+ }
+ }
+
+ class BisectingKMeansWrapperReader extends MLReader[BisectingKMeansWrapper] {
+
+ override def load(path: String): BisectingKMeansWrapper = {
+ implicit val format = DefaultFormats
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+ val pipeline = PipelineModel.load(pipelinePath)
+
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadata = parse(rMetadataStr)
+ val features = (rMetadata \ "features").extract[Array[String]]
+ val size = (rMetadata \ "size").extract[Array[Long]]
+ new BisectingKMeansWrapper(pipeline, features, size, isLoaded = true)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index b59fe29..c441792 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -64,6 +64,8 @@ private[r] object RWrappers extends MLReader[Object] {
GBTRegressorWrapper.load(path)
case "org.apache.spark.ml.r.GBTClassifierWrapper" =>
GBTClassifierWrapper.load(path)
+ case "org.apache.spark.ml.r.BisectingKMeansWrapper" =>
+ BisectingKMeansWrapper.load(path)
case _ =>
throw new SparkException(s"SparkR read.ml does not support load $className")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org