You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/02/15 09:15:53 UTC

spark git commit: [SPARK-19456][SPARKR] Add LinearSVC R API

Repository: spark
Updated Branches:
  refs/heads/master 447b2b530 -> 3973403d5


[SPARK-19456][SPARKR] Add LinearSVC R API

## What changes were proposed in this pull request?

Linear SVM classifier is newly added into ML and python API has been added. This JIRA is to add R side API.

Marked as WIP, as I am designing unit tests.

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: wm624@hotmail.com <wm...@hotmail.com>

Closes #16800 from wangmiao1981/svc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3973403d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3973403d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3973403d

Branch: refs/heads/master
Commit: 3973403d5d90a48e3a995159680239ba5240e30c
Parents: 447b2b5
Author: wm624@hotmail.com <wm...@hotmail.com>
Authored: Wed Feb 15 01:15:50 2017 -0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Feb 15 01:15:50 2017 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   3 +-
 R/pkg/R/generics.R                              |   4 +
 R/pkg/R/mllib_classification.R                  | 132 ++++++++++++++++
 R/pkg/R/mllib_utils.R                           |   9 +-
 .../tests/testthat/test_mllib_classification.R  |  44 ++++++
 .../apache/spark/ml/r/LinearSVCWrapper.scala    | 152 +++++++++++++++++++
 .../scala/org/apache/spark/ml/r/RWrappers.scala |   2 +
 7 files changed, 342 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 625c797..8b26500 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -65,7 +65,8 @@ exportMethods("glm",
               "spark.logit",
               "spark.randomForest",
               "spark.gbt",
-              "spark.bisectingKmeans")
+              "spark.bisectingKmeans",
+              "spark.svmLinear")
 
 # Job group lifecycle management methods
 export("setJobGroup",

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index d78b1a1..0d9a996 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1401,6 +1401,10 @@ setGeneric("spark.randomForest",
 #' @export
 setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") })
 
+#' @rdname spark.svmLinear
+#' @export
+setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") })
+
 #' @rdname spark.lda
 #' @export
 setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") })

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/R/mllib_classification.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R
index 552cbe4..fa0d795 100644
--- a/R/pkg/R/mllib_classification.R
+++ b/R/pkg/R/mllib_classification.R
@@ -18,6 +18,13 @@
 # mllib_regression.R: Provides methods for MLlib classification algorithms
 #                     (except for tree-based algorithms) integration
 
+#' S4 class that represents an LinearSVCModel
+#'
+#' @param jobj a Java object reference to the backing Scala LinearSVCModel
+#' @export
+#' @note LinearSVCModel since 2.2.0
+setClass("LinearSVCModel", representation(jobj = "jobj"))
+
 #' S4 class that represents an LogisticRegressionModel
 #'
 #' @param jobj a Java object reference to the backing Scala LogisticRegressionModel
@@ -39,6 +46,131 @@ setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj"
 #' @note NaiveBayesModel since 2.0.0
 setClass("NaiveBayesModel", representation(jobj = "jobj"))
 
+#' linear SVM Model
+#'
+#' Fits an linear SVM model against a SparkDataFrame. It is a binary classifier, similar to svm in glmnet package
+#' Users can print, make predictions on the produced model and save the model to the input path.
+#'
+#' @param data SparkDataFrame for training.
+#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
+#'                operators are supported, including '~', '.', ':', '+', and '-'.
+#' @param regParam The regularization parameter.
+#' @param maxIter Maximum iteration number.
+#' @param tol Convergence tolerance of iterations.
+#' @param standardization Whether to standardize the training features before fitting the model. The coefficients
+#'                        of models will be always returned on the original scale, so it will be transparent for
+#'                        users. Note that with/without standardization, the models should be always converged
+#'                        to the same solution when no regularization is applied.
+#' @param threshold The threshold in binary classification, in range [0, 1].
+#' @param weightCol The weight column name.
+#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
+#'                         or the number of partitions are large, this param could be adjusted to a larger size.
+#'                         This is an expert parameter. Default value should be good for most cases.
+#' @param ... additional arguments passed to the method.
+#' @return \code{spark.svmLinear} returns a fitted linear SVM model.
+#' @rdname spark.svmLinear
+#' @aliases spark.svmLinear,SparkDataFrame,formula-method
+#' @name spark.svmLinear
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- createDataFrame(iris)
+#' training <- df[df$Species %in% c("versicolor", "virginica"), ]
+#' model <- spark.svmLinear(training, Species ~ ., regParam = 0.5)
+#' summary <- summary(model)
+#'
+#' # fitted values on training data
+#' fitted <- predict(model, training)
+#'
+#' # save fitted model to input path
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#'
+#' # can also read back the saved model and predict
+#' # Note that summary deos not work on loaded model
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.svmLinear since 2.2.0
+setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"),
+          function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE,
+                   threshold = 0.0, weightCol = NULL, aggregationDepth = 2) {
+            formula <- paste(deparse(formula), collapse = "")
+
+            if (!is.null(weightCol) && weightCol == "") {
+              weightCol <- NULL
+            } else if (!is.null(weightCol)) {
+              weightCol <- as.character(weightCol)
+            }
+
+            jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit",
+                                data@sdf, formula, as.numeric(regParam), as.integer(maxIter),
+                                as.numeric(tol), as.logical(standardization), as.numeric(threshold),
+                                weightCol, as.integer(aggregationDepth))
+            new("LinearSVCModel", jobj = jobj)
+          })
+
+#  Predicted values based on an LinearSVCModel model
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns the predicted values based on an LinearSVCModel.
+#' @rdname spark.svmLinear
+#' @aliases predict,LinearSVCModel,SparkDataFrame-method
+#' @export
+#' @note predict(LinearSVCModel) since 2.2.0
+setMethod("predict", signature(object = "LinearSVCModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+#  Get the summary of an LinearSVCModel
+
+#' @param object an LinearSVCModel fitted by \code{spark.svmLinear}.
+#' @return \code{summary} returns summary information of the fitted model, which is a list.
+#'         The list includes \code{coefficients} (coefficients of the fitted model),
+#'         \code{intercept} (intercept of the fitted model), \code{numClasses} (number of classes),
+#'         \code{numFeatures} (number of features).
+#' @rdname spark.svmLinear
+#' @aliases summary,LinearSVCModel-method
+#' @export
+#' @note summary(LinearSVCModel) since 2.2.0
+setMethod("summary", signature(object = "LinearSVCModel"),
+          function(object) {
+            jobj <- object@jobj
+            features <- callJMethod(jobj, "features")
+            labels <- callJMethod(jobj, "labels")
+            coefficients <- callJMethod(jobj, "coefficients")
+            nCol <- length(coefficients) / length(features)
+            coefficients <- matrix(unlist(coefficients), ncol = nCol)
+            intercept <- callJMethod(jobj, "intercept")
+            numClasses <- callJMethod(jobj, "numClasses")
+            numFeatures <- callJMethod(jobj, "numFeatures")
+            if (nCol == 1) {
+              colnames(coefficients) <- c("Estimate")
+            } else {
+              colnames(coefficients) <- unlist(labels)
+            }
+            rownames(coefficients) <- unlist(features)
+            list(coefficients = coefficients, intercept = intercept,
+                 numClasses = numClasses, numFeatures = numFeatures)
+          })
+
+#  Save fitted LinearSVCModel to the input path
+
+#' @param path The directory where the model is saved.
+#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
+#'                  which means throw exception if the output path exists.
+#'
+#' @rdname spark.svmLinear
+#' @aliases write.ml,LinearSVCModel,character-method
+#' @export
+#' @note write.ml(LogisticRegression, character) since 2.2.0
+setMethod("write.ml", signature(object = "LinearSVCModel", path = "character"),
+function(object, path, overwrite = FALSE) {
+    write_internal(object, path, overwrite)
+})
+
 #' Logistic Regression Model
 #'
 #' Fits an logistic regression model against a SparkDataFrame. It supports "binomial": Binary logistic regression

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/R/mllib_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
index 29c4473..04a0a6f 100644
--- a/R/pkg/R/mllib_utils.R
+++ b/R/pkg/R/mllib_utils.R
@@ -35,8 +35,9 @@
 #' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
 #' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
 #' @seealso \link{spark.kmeans},
-#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
-#' @seealso \link{spark.randomForest}, \link{spark.survreg},
+#' @seealso \link{spark.lda}, \link{spark.logit},
+#' @seealso \link{spark.mlp}, \link{spark.naiveBayes},
+#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear},
 #' @seealso \link{read.ml}
 NULL
 
@@ -51,7 +52,7 @@ NULL
 #' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
 #' @seealso \link{spark.kmeans},
 #' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
-#' @seealso \link{spark.randomForest}, \link{spark.survreg}
+#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear}
 NULL
 
 write_internal <- function(object, path, overwrite = FALSE) {
@@ -115,6 +116,8 @@ read.ml <- function(path) {
     new("GBTClassificationModel", jobj = jobj)
   } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
     new("BisectingKMeansModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LinearSVCWrapper")) {
+    new("LinearSVCModel", jobj = jobj)
   } else {
     stop("Unsupported model: ", jobj)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/R/pkg/inst/tests/testthat/test_mllib_classification.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib_classification.R b/R/pkg/inst/tests/testthat/test_mllib_classification.R
index 5f84a62..620f528 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_classification.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_classification.R
@@ -27,6 +27,50 @@ absoluteSparkPath <- function(x) {
   file.path(sparkHome, x)
 }
 
+test_that("spark.svmLinear", {
+  df <- suppressWarnings(createDataFrame(iris))
+  training <- df[df$Species %in% c("versicolor", "virginica"), ]
+  model <- spark.svmLinear(training,  Species ~ ., regParam = 0.01, maxIter = 10)
+  summary <- summary(model)
+
+  # test summary coefficients return matrix type
+  expect_true(class(summary$coefficients) == "matrix")
+  expect_true(class(summary$coefficients[, 1]) == "numeric")
+
+  coefs <- summary$coefficients[, "Estimate"]
+  expected_coefs <- c(-0.1563083, -0.460648, 0.2276626, 1.055085)
+  expect_true(all(abs(coefs - expected_coefs) < 0.1))
+  expect_equal(summary$intercept, -0.06004978, tolerance = 1e-2)
+
+  # Test prediction with string label
+  prediction <- predict(model, training)
+  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
+  expected <- c("versicolor", "versicolor", "versicolor", "virginica",  "virginica",
+                "virginica",  "virginica",  "virginica",  "virginica",  "virginica")
+  expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)
+
+  # Test model save and load
+  modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
+  write.ml(model, modelPath)
+  expect_error(write.ml(model, modelPath))
+  write.ml(model, modelPath, overwrite = TRUE)
+  model2 <- read.ml(modelPath)
+  coefs <- summary(model)$coefficients
+  coefs2 <- summary(model2)$coefficients
+  expect_equal(coefs, coefs2)
+  unlink(modelPath)
+
+  # Test prediction with numeric label
+  label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
+  feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
+  data <- as.data.frame(cbind(label, feature))
+  df <- createDataFrame(data)
+  model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
+  prediction <- collect(select(predict(model, df), "prediction"))
+  expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
+
+})
+
 test_that("spark.logit", {
   # R code to reproduce the result.
   # nolint start

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
new file mode 100644
index 0000000..cfd043b
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.classification.{LinearSVC, LinearSVCModel}
+import org.apache.spark.ml.feature.{IndexToString, RFormula}
+import org.apache.spark.ml.r.RWrapperUtils._
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class LinearSVCWrapper private (
+    val pipeline: PipelineModel,
+    val features: Array[String],
+    val labels: Array[String]) extends MLWritable {
+  import LinearSVCWrapper._
+
+  private val svcModel: LinearSVCModel =
+    pipeline.stages(1).asInstanceOf[LinearSVCModel]
+
+  lazy val coefficients: Array[Double] = svcModel.coefficients.toArray
+
+  lazy val intercept: Double = svcModel.intercept
+
+  lazy val numClasses: Int = svcModel.numClasses
+
+  lazy val numFeatures: Int = svcModel.numFeatures
+
+  def transform(dataset: Dataset[_]): DataFrame = {
+    pipeline.transform(dataset)
+      .drop(PREDICTED_LABEL_INDEX_COL)
+      .drop(svcModel.getFeaturesCol)
+      .drop(svcModel.getLabelCol)
+  }
+
+  override def write: MLWriter = new LinearSVCWrapper.LinearSVCWrapperWriter(this)
+}
+
+private[r] object LinearSVCWrapper
+  extends MLReadable[LinearSVCWrapper] {
+
+  val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
+  val PREDICTED_LABEL_COL = "prediction"
+
+  def fit(
+      data: DataFrame,
+      formula: String,
+      regParam: Double,
+      maxIter: Int,
+      tol: Double,
+      standardization: Boolean,
+      threshold: Double,
+      weightCol: String,
+      aggregationDepth: Int
+      ): LinearSVCWrapper = {
+
+    val rFormula = new RFormula()
+      .setFormula(formula)
+      .setForceIndexLabel(true)
+    checkDataColumns(rFormula, data)
+    val rFormulaModel = rFormula.fit(data)
+
+    val fitIntercept = rFormula.hasIntercept
+
+    // get labels and feature names from output schema
+    val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
+
+    // assemble and fit the pipeline
+    val svc = new LinearSVC()
+      .setRegParam(regParam)
+      .setMaxIter(maxIter)
+      .setTol(tol)
+      .setFitIntercept(fitIntercept)
+      .setStandardization(standardization)
+      .setFeaturesCol(rFormula.getFeaturesCol)
+      .setLabelCol(rFormula.getLabelCol)
+      .setPredictionCol(PREDICTED_LABEL_INDEX_COL)
+      .setThreshold(threshold)
+      .setAggregationDepth(aggregationDepth)
+
+    if (weightCol != null) svc.setWeightCol(weightCol)
+
+    val idxToStr = new IndexToString()
+      .setInputCol(PREDICTED_LABEL_INDEX_COL)
+      .setOutputCol(PREDICTED_LABEL_COL)
+      .setLabels(labels)
+
+    val pipeline = new Pipeline()
+      .setStages(Array(rFormulaModel, svc, idxToStr))
+      .fit(data)
+
+    new LinearSVCWrapper(pipeline, features, labels)
+  }
+
+  override def read: MLReader[LinearSVCWrapper] = new LinearSVCWrapperReader
+
+  override def load(path: String): LinearSVCWrapper = super.load(path)
+
+  class LinearSVCWrapperWriter(instance: LinearSVCWrapper) extends MLWriter {
+
+    override protected def saveImpl(path: String): Unit = {
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+
+      val rMetadata = ("class" -> instance.getClass.getName) ~
+        ("features" -> instance.features.toSeq) ~
+        ("labels" -> instance.labels.toSeq)
+      val rMetadataJson: String = compact(render(rMetadata))
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+
+      instance.pipeline.save(pipelinePath)
+    }
+  }
+
+  class LinearSVCWrapperReader extends MLReader[LinearSVCWrapper] {
+
+    override def load(path: String): LinearSVCWrapper = {
+      implicit val format = DefaultFormats
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadata = parse(rMetadataStr)
+      val features = (rMetadata \ "features").extract[Array[String]]
+      val labels = (rMetadata \ "labels").extract[Array[String]]
+
+      val pipeline = PipelineModel.load(pipelinePath)
+      new LinearSVCWrapper(pipeline, features, labels)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3973403d/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index c441792..358e522 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -66,6 +66,8 @@ private[r] object RWrappers extends MLReader[Object] {
         GBTClassifierWrapper.load(path)
       case "org.apache.spark.ml.r.BisectingKMeansWrapper" =>
         BisectingKMeansWrapper.load(path)
+      case "org.apache.spark.ml.r.LinearSVCWrapper" =>
+        LinearSVCWrapper.load(path)
       case _ =>
         throw new SparkException(s"SparkR read.ml does not support load $className")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org