You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/01/27 05:02:02 UTC

spark git commit: [SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR

Repository: spark
Updated Branches:
  refs/heads/master 1191fe267 -> c0ba28430


[SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR

## What changes were proposed in this pull request?

Add R wrapper for bisecting Kmeans.

As JIRA is down, I will update title to link with corresponding JIRA later.

## How was this patch tested?

Add new unit tests.

Author: wm624@hotmail.com <wm...@hotmail.com>

Closes #16566 from wangmiao1981/bk.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ba2843
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ba2843
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ba2843

Branch: refs/heads/master
Commit: c0ba284300e494354f5bb205a10a12ac7daa2b5e
Parents: 1191fe2
Author: wm624@hotmail.com <wm...@hotmail.com>
Authored: Thu Jan 26 21:01:59 2017 -0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Thu Jan 26 21:01:59 2017 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   3 +-
 R/pkg/R/generics.R                              |   5 +
 R/pkg/R/mllib_clustering.R                      | 149 +++++++++++++++++++
 R/pkg/R/mllib_utils.R                           |  10 +-
 .../inst/tests/testthat/test_mllib_clustering.R |  40 +++++
 .../spark/ml/r/BisectingKMeansWrapper.scala     | 143 ++++++++++++++++++
 .../scala/org/apache/spark/ml/r/RWrappers.scala |   2 +
 7 files changed, 347 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 0cd9cb8..caa1c3b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -47,7 +47,8 @@ exportMethods("glm",
               "spark.kstest",
               "spark.logit",
               "spark.randomForest",
-              "spark.gbt")
+              "spark.gbt",
+              "spark.bisectingKmeans")
 
 # Job group lifecycle management methods
 export("setJobGroup",

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 499c7b2..433c166 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1338,6 +1338,11 @@ setGeneric("rbind", signature = "...")
 #' @export
 setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })
 
+#' @rdname spark.bisectingKmeans
+#' @export
+setGeneric("spark.bisectingKmeans",
+           function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })
+
 #' @rdname spark.gaussianMixture
 #' @export
 setGeneric("spark.gaussianMixture",

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R
index fa40f9d..05bbab6 100644
--- a/R/pkg/R/mllib_clustering.R
+++ b/R/pkg/R/mllib_clustering.R
@@ -17,6 +17,13 @@
 
 # mllib_clustering.R: Provides methods for MLlib clustering algorithms integration
 
+#' S4 class that represents a BisectingKMeansModel
+#'
+#' @param jobj a Java object reference to the backing Scala BisectingKMeansModel
+#' @export
+#' @note BisectingKMeansModel since 2.2.0
+setClass("BisectingKMeansModel", representation(jobj = "jobj"))
+
 #' S4 class that represents a GaussianMixtureModel
 #'
 #' @param jobj a Java object reference to the backing Scala GaussianMixtureModel
@@ -38,6 +45,148 @@ setClass("KMeansModel", representation(jobj = "jobj"))
 #' @note LDAModel since 2.1.0
 setClass("LDAModel", representation(jobj = "jobj"))
 
+#' Bisecting K-Means Clustering Model
+#'
+#' Fits a bisecting k-means clustering model against a Spark DataFrame.
+#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
+#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
+#'
+#' @param data a SparkDataFrame for training.
+#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
+#'                operators are supported, including '~', '.', ':', '+', and '-'.
+#'                Note that the response variable of formula is empty in spark.bisectingKmeans.
+#' @param k the desired number of leaf clusters. Must be > 1.
+#'          The actual number could be smaller if there are no divisible leaf clusters.
+#' @param maxIter maximum iteration number.
+#' @param seed the random seed.
+#' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0)
+#'                                or the minimum proportion of points (if less than 1.0) of a divisible cluster.
+#'                                Note that it is an expert parameter. The default value should be good enough
+#'                                for most cases.
+#' @param ... additional argument(s) passed to the method.
+#' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model.
+#' @rdname spark.bisectingKmeans
+#' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method
+#' @name spark.bisectingKmeans
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- createDataFrame(iris)
+#' model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
+#' summary(model)
+#'
+#' # get fitted result from a bisecting k-means model
+#' fitted.model <- fitted(model, "centers")
+#' showDF(fitted.model)
+#'
+#' # fitted values on training data
+#' fitted <- predict(model, df)
+#' head(select(fitted, "Sepal_Length", "prediction"))
+#'
+#' # save fitted model to input path
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#'
+#' # can also read back the saved model and print
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.bisectingKmeans since 2.2.0
+#' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
+setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"),
+          function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) {
+            formula <- paste0(deparse(formula), collapse = "")
+            if (!is.null(seed)) {
+              seed <- as.character(as.integer(seed))
+            }
+            jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit",
+                                data@sdf, formula, as.integer(k), as.integer(maxIter),
+                                seed, as.numeric(minDivisibleClusterSize))
+            new("BisectingKMeansModel", jobj = jobj)
+          })
+
+#  Get the summary of a bisecting k-means model
+
+#' @param object a fitted bisecting k-means model.
+#' @return \code{summary} returns summary information of the fitted model, which is a list.
+#'         The list includes the model's \code{k} (number of cluster centers),
+#'         \code{coefficients} (model cluster centers),
+#'         \code{size} (number of data points in each cluster), \code{cluster}
+#'         (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE),
+#'         and \code{is.loaded} (whether the model is loaded from a saved file).
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note summary(BisectingKMeansModel) since 2.2.0
+setMethod("summary", signature(object = "BisectingKMeansModel"),
+          function(object) {
+            jobj <- object@jobj
+            is.loaded <- callJMethod(jobj, "isLoaded")
+            features <- callJMethod(jobj, "features")
+            coefficients <- callJMethod(jobj, "coefficients")
+            k <- callJMethod(jobj, "k")
+            size <- callJMethod(jobj, "size")
+            coefficients <- t(matrix(coefficients, ncol = k))
+            colnames(coefficients) <- unlist(features)
+            rownames(coefficients) <- 1:k
+            cluster <- if (is.loaded) {
+              NULL
+            } else {
+              dataFrame(callJMethod(jobj, "cluster"))
+            }
+            list(k = k, coefficients = coefficients, size = size,
+            cluster = cluster, is.loaded = is.loaded)
+          })
+
+#  Predicted values based on a bisecting k-means model
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns the predicted values based on a bisecting k-means model.
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note predict(BisectingKMeansModel) since 2.2.0
+setMethod("predict", signature(object = "BisectingKMeansModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#' Get fitted result from a bisecting k-means model
+#'
+#' Get fitted result from a bisecting k-means model.
+#' Note: A saved-loaded model does not support this method.
+#'
+#' @param method type of fitted results, \code{"centers"} for cluster centers
+#'        or \code{"classes"} for assigned classes.
+#' @return \code{fitted} returns a SparkDataFrame containing fitted values.
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note fitted since 2.2.0
+setMethod("fitted", signature(object = "BisectingKMeansModel"),
+          function(object, method = c("centers", "classes")) {
+            method <- match.arg(method)
+            jobj <- object@jobj
+            is.loaded <- callJMethod(jobj, "isLoaded")
+            if (is.loaded) {
+              stop("Saved-loaded bisecting k-means model does not support 'fitted' method")
+            } else {
+              dataFrame(callJMethod(jobj, "fitted", method))
+            }
+          })
+
+#  Save fitted MLlib model to the input path
+
+#' @param path the directory where the model is saved.
+#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
+#'                  which means throw exception if the output path exists.
+#'
+#' @rdname spark.bisectingKmeans
+#' @export
+#' @note write.ml(BisectingKMeansModel, character) since 2.2.0
+setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })
+
 #' Multivariate Gaussian Mixture Model (GMM)
 #'
 #' Fits multivariate gaussian mixture model against a Spark DataFrame, similarly to R's

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/R/mllib_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
index 720ee41..29c4473 100644
--- a/R/pkg/R/mllib_utils.R
+++ b/R/pkg/R/mllib_utils.R
@@ -32,8 +32,8 @@
 #' @rdname write.ml
 #' @name write.ml
 #' @export
-#' @seealso \link{spark.glm}, \link{glm},
-#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
+#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
 #' @seealso \link{spark.kmeans},
 #' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
 #' @seealso \link{spark.randomForest}, \link{spark.survreg},
@@ -47,8 +47,8 @@ NULL
 #' @rdname predict
 #' @name predict
 #' @export
-#' @seealso \link{spark.glm}, \link{glm},
-#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
+#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
+#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
 #' @seealso \link{spark.kmeans},
 #' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
 #' @seealso \link{spark.randomForest}, \link{spark.survreg}
@@ -113,6 +113,8 @@ read.ml <- function(path) {
     new("GBTRegressionModel", jobj = jobj)
   } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) {
     new("GBTClassificationModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
+    new("BisectingKMeansModel", jobj = jobj)
   } else {
     stop("Unsupported model: ", jobj)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/R/pkg/inst/tests/testthat/test_mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib_clustering.R b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
index 9de8362..aad834b 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_clustering.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
@@ -27,6 +27,46 @@ absoluteSparkPath <- function(x) {
   file.path(sparkHome, x)
 }
 
+test_that("spark.bisectingKmeans", {
+  newIris <- iris
+  newIris$Species <- NULL
+  training <- suppressWarnings(createDataFrame(newIris))
+
+  take(training, 1)
+
+  model <- spark.bisectingKmeans(data = training, ~ .)
+  sample <- take(select(predict(model, training), "prediction"), 1)
+  expect_equal(typeof(sample$prediction), "integer")
+  expect_equal(sample$prediction, 1)
+
+  # Test fitted works on Bisecting KMeans
+  fitted.model <- fitted(model)
+  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
+               c(0, 1, 2, 3))
+
+  # Test summary works on KMeans
+  summary.model <- summary(model)
+  cluster <- summary.model$cluster
+  k <- summary.model$k
+  expect_equal(k, 4)
+  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
+               c(0, 1, 2, 3))
+
+  # Test model save/load
+  modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
+  write.ml(model, modelPath)
+  expect_error(write.ml(model, modelPath))
+  write.ml(model, modelPath, overwrite = TRUE)
+  model2 <- read.ml(modelPath)
+  summary2 <- summary(model2)
+  expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+  expect_equal(summary.model$coefficients, summary2$coefficients)
+  expect_true(!summary.model$is.loaded)
+  expect_true(summary2$is.loaded)
+
+  unlink(modelPath)
+})
+
 test_that("spark.gaussianMixture", {
   # R code to reproduce the result.
   # nolint start

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
new file mode 100644
index 0000000..71712c1
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.clustering.{BisectingKMeans, BisectingKMeansModel}
+import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class BisectingKMeansWrapper private (
+    val pipeline: PipelineModel,
+    val features: Array[String],
+    val size: Array[Long],
+    val isLoaded: Boolean = false) extends MLWritable {
+  private val bisectingKmeansModel: BisectingKMeansModel =
+    pipeline.stages.last.asInstanceOf[BisectingKMeansModel]
+
+  lazy val coefficients: Array[Double] = bisectingKmeansModel.clusterCenters.flatMap(_.toArray)
+
+  lazy val k: Int = bisectingKmeansModel.getK
+
+  // If the model is loaded from a saved model, cluster is NULL. It is checked on R side
+  lazy val cluster: DataFrame = bisectingKmeansModel.summary.cluster
+
+  def fitted(method: String): DataFrame = {
+    if (method == "centers") {
+      bisectingKmeansModel.summary.predictions.drop(bisectingKmeansModel.getFeaturesCol)
+    } else if (method == "classes") {
+      bisectingKmeansModel.summary.cluster
+    } else {
+      throw new UnsupportedOperationException(
+        s"Method (centers or classes) required but $method found.")
+    }
+  }
+
+  def transform(dataset: Dataset[_]): DataFrame = {
+    pipeline.transform(dataset).drop(bisectingKmeansModel.getFeaturesCol)
+  }
+
+  override def write: MLWriter = new BisectingKMeansWrapper.BisectingKMeansWrapperWriter(this)
+}
+
+private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapper] {
+
+  def fit(
+      data: DataFrame,
+      formula: String,
+      k: Int,
+      maxIter: Int,
+      seed: String,
+      minDivisibleClusterSize: Double
+      ): BisectingKMeansWrapper = {
+
+    val rFormula = new RFormula()
+      .setFormula(formula)
+      .setFeaturesCol("features")
+    RWrapperUtils.checkDataColumns(rFormula, data)
+    val rFormulaModel = rFormula.fit(data)
+
+    // get feature names from output schema
+    val schema = rFormulaModel.transform(data).schema
+    val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
+      .attributes.get
+    val features = featureAttrs.map(_.name.get)
+
+    val bisectingKmeans = new BisectingKMeans()
+      .setK(k)
+      .setMaxIter(maxIter)
+      .setMinDivisibleClusterSize(minDivisibleClusterSize)
+      .setFeaturesCol(rFormula.getFeaturesCol)
+
+    if (seed != null && seed.length > 0) bisectingKmeans.setSeed(seed.toInt)
+
+    val pipeline = new Pipeline()
+      .setStages(Array(rFormulaModel, bisectingKmeans))
+      .fit(data)
+
+    val bisectingKmeansModel: BisectingKMeansModel =
+      pipeline.stages.last.asInstanceOf[BisectingKMeansModel]
+    val size: Array[Long] = bisectingKmeansModel.summary.clusterSizes
+
+    new BisectingKMeansWrapper(pipeline, features, size)
+  }
+
+  override def read: MLReader[BisectingKMeansWrapper] = new BisectingKMeansWrapperReader
+
+  override def load(path: String): BisectingKMeansWrapper = super.load(path)
+
+  class BisectingKMeansWrapperWriter(instance: BisectingKMeansWrapper) extends MLWriter {
+
+    override protected def saveImpl(path: String): Unit = {
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+
+      val rMetadata = ("class" -> instance.getClass.getName) ~
+        ("features" -> instance.features.toSeq) ~
+        ("size" -> instance.size.toSeq)
+      val rMetadataJson: String = compact(render(rMetadata))
+
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      instance.pipeline.save(pipelinePath)
+    }
+  }
+
+  class BisectingKMeansWrapperReader extends MLReader[BisectingKMeansWrapper] {
+
+    override def load(path: String): BisectingKMeansWrapper = {
+      implicit val format = DefaultFormats
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+      val pipeline = PipelineModel.load(pipelinePath)
+
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadata = parse(rMetadataStr)
+      val features = (rMetadata \ "features").extract[Array[String]]
+      val size = (rMetadata \ "size").extract[Array[Long]]
+      new BisectingKMeansWrapper(pipeline, features, size, isLoaded = true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ba2843/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index b59fe29..c441792 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -64,6 +64,8 @@ private[r] object RWrappers extends MLReader[Object] {
         GBTRegressorWrapper.load(path)
       case "org.apache.spark.ml.r.GBTClassifierWrapper" =>
         GBTClassifierWrapper.load(path)
+      case "org.apache.spark.ml.r.BisectingKMeansWrapper" =>
+        BisectingKMeansWrapper.load(path)
       case _ =>
         throw new SparkException(s"SparkR read.ml does not support load $className")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org