You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/12/11 00:28:31 UTC

[spark] branch master updated: [SPARK-19827][R] spark.ml R API for PIC

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05cf81e  [SPARK-19827][R] spark.ml R API for PIC
05cf81e is described below

commit 05cf81e6de3d61ddb0af81cd179665693f23351f
Author: Huaxin Gao <hu...@us.ibm.com>
AuthorDate: Mon Dec 10 18:28:13 2018 -0600

    [SPARK-19827][R] spark.ml R API for PIC
    
    ## What changes were proposed in this pull request?
    
    Add PowerIterationCluster (PIC) in R
    ## How was this patch tested?
    Add test case
    
    Closes #23072 from huaxingao/spark-19827.
    
    Authored-by: Huaxin Gao <hu...@us.ibm.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 R/pkg/NAMESPACE                                    |  3 +-
 R/pkg/R/generics.R                                 |  4 ++
 R/pkg/R/mllib_clustering.R                         | 62 ++++++++++++++++++++++
 R/pkg/tests/fulltests/test_mllib_clustering.R      | 13 +++++
 R/pkg/vignettes/sparkr-vignettes.Rmd               | 14 +++++
 docs/ml-clustering.md                              | 41 ++++++++++++++
 docs/sparkr.md                                     |  1 +
 examples/src/main/r/ml/powerIterationClustering.R  | 38 +++++++++++++
 .../ml/clustering/PowerIterationClustering.scala   |  4 +-
 .../ml/r/PowerIterationClusteringWrapper.scala     | 39 ++++++++++++++
 python/pyspark/ml/clustering.py                    |  4 +-
 11 files changed, 218 insertions(+), 5 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 1f8ba0b..cfad20d 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -67,7 +67,8 @@ exportMethods("glm",
               "spark.fpGrowth",
               "spark.freqItemsets",
               "spark.associationRules",
-              "spark.findFrequentSequentialPatterns")
+              "spark.findFrequentSequentialPatterns",
+              "spark.assignClusters")
 
 # Job group lifecycle management methods
 export("setJobGroup",
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index eed7646..09d8171 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1479,6 +1479,10 @@ setGeneric("spark.associationRules", function(object) { standardGeneric("spark.a
 setGeneric("spark.findFrequentSequentialPatterns",
             function(data, ...) { standardGeneric("spark.findFrequentSequentialPatterns") })
 
+#' @rdname spark.powerIterationClustering
+setGeneric("spark.assignClusters",
+            function(data, ...) { standardGeneric("spark.assignClusters") })
+
 #' @param object a fitted ML model object.
 #' @param path the directory where the model is saved.
 #' @param ... additional argument(s) passed to the method.
diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R
index 900be68..7d9dceb 100644
--- a/R/pkg/R/mllib_clustering.R
+++ b/R/pkg/R/mllib_clustering.R
@@ -41,6 +41,12 @@ setClass("KMeansModel", representation(jobj = "jobj"))
 #' @note LDAModel since 2.1.0
 setClass("LDAModel", representation(jobj = "jobj"))
 
+#' S4 class that represents a PowerIterationClustering
+#'
+#' @param jobj a Java object reference to the backing Scala PowerIterationClustering
+#' @note PowerIterationClustering since 3.0.0
+setClass("PowerIterationClustering", slots = list(jobj = "jobj"))
+
 #' Bisecting K-Means Clustering Model
 #'
 #' Fits a bisecting k-means clustering model against a SparkDataFrame.
@@ -610,3 +616,59 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"),
           function(object, path, overwrite = FALSE) {
             write_internal(object, path, overwrite)
           })
+
+#' PowerIterationClustering
+#'
+#' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to
+#' return a cluster assignment for each input vertex.
+#'
+#  Run the PIC algorithm and returns a cluster assignment for each input vertex.
+#' @param data a SparkDataFrame.
+#' @param k the number of clusters to create.
+#' @param initMode the initialization algorithm.
+#' @param maxIter the maximum number of iterations.
+#' @param sourceCol the name of the input column for source vertex IDs.
+#' @param destinationCol the name of the input column for destination vertex IDs
+#' @param weightCol weight column name. If this is not set or \code{NULL},
+#'                  we treat all instance weights as 1.0.
+#' @param ... additional argument(s) passed to the method.
+#' @return A dataset that contains columns of vertex id and the corresponding cluster for the id.
+#'         The schema of it will be:
+#'         \code{id: Long}
+#'         \code{cluster: Int}
+#' @rdname spark.powerIterationClustering
+#' @aliases assignClusters,PowerIterationClustering-method,SparkDataFrame-method
+#' @examples
+#' \dontrun{
+#' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
+#'                            list(1L, 2L, 1.0), list(3L, 4L, 1.0),
+#'                            list(4L, 0L, 0.1)),
+#'                       schema = c("src", "dst", "weight"))
+#' clusters <- spark.assignClusters(df, initMode="degree", weightCol="weight")
+#' showDF(clusters)
+#' }
+#' @note spark.assignClusters(SparkDataFrame) since 3.0.0
+setMethod("spark.assignClusters",
+          signature(data = "SparkDataFrame"),
+          function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L,
+            sourceCol = "src", destinationCol = "dst", weightCol = NULL) {
+            if (!is.numeric(k) || k < 1) {
+              stop("k should be a number with value >= 1.")
+            }
+            if (!is.integer(maxIter) || maxIter <= 0) {
+              stop("maxIter should be a number with value > 0.")
+            }
+            initMode <- match.arg(initMode)
+            if (!is.null(weightCol) && weightCol == "") {
+              weightCol <- NULL
+            } else if (!is.null(weightCol)) {
+              weightCol <- as.character(weightCol)
+            }
+            jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper",
+                                "getPowerIterationClustering",
+                                as.integer(k), initMode,
+                                as.integer(maxIter), as.character(sourceCol),
+                                as.character(destinationCol), weightCol)
+            object <- new("PowerIterationClustering", jobj = jobj)
+            dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf))
+          })
diff --git a/R/pkg/tests/fulltests/test_mllib_clustering.R b/R/pkg/tests/fulltests/test_mllib_clustering.R
index 4110e13..b78a476 100644
--- a/R/pkg/tests/fulltests/test_mllib_clustering.R
+++ b/R/pkg/tests/fulltests/test_mllib_clustering.R
@@ -319,4 +319,17 @@ test_that("spark.posterior and spark.perplexity", {
   expect_equal(length(local.posterior), sum(unlist(local.posterior)))
 })
 
+test_that("spark.assignClusters", {
+  df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
+                             list(1L, 2L, 1.0), list(3L, 4L, 1.0),
+                             list(4L, 0L, 0.1)),
+                        schema = c("src", "dst", "weight"))
+  clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
+  expected_result <- createDataFrame(list(list(4L, 1L), list(0L, 0L),
+                                          list(1L, 0L), list(3L, 1L),
+                                          list(2L, 0L)),
+                                     schema = c("id", "cluster"))
+  expect_equivalent(expected_result, clusters)
+})
+
 sparkR.session.stop()
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 1c6a03c..cbe8c617 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -549,6 +549,8 @@ SparkR supports the following machine learning models and algorithms.
 
 * Latent Dirichlet Allocation (LDA)
 
+* Power Iteration Clustering (PIC)
+
 #### Collaborative Filtering
 
 * Alternating Least Squares (ALS)
@@ -982,6 +984,18 @@ predicted <- predict(model, df)
 head(predicted)
 ```
 
+#### Power Iteration Clustering
+
+Power Iteration Clustering (PIC) is a scalable graph clustering algorithm. `spark.assignClusters` method runs the PIC algorithm and returns a cluster assignment for each input vertex.
+
+```{r}
+df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
+                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
+                           list(4L, 0L, 0.1)),
+                      schema = c("src", "dst", "weight"))
+head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
+```
+
 #### FP-growth
 
 `spark.fpGrowth` executes FP-growth algorithm to mine frequent itemsets on a `SparkDataFrame`. `itemsCol` should be an array of values.
diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md
index 1186fb7..65f2652 100644
--- a/docs/ml-clustering.md
+++ b/docs/ml-clustering.md
@@ -265,3 +265,44 @@ Refer to the [R API docs](api/R/spark.gaussianMixture.html) for more details.
 </div>
 
 </div>
+
+## Power Iteration Clustering (PIC)
+
+Power Iteration Clustering (PIC) is  a scalable graph clustering algorithm
+developed by [Lin and Cohen](http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf).
+From the abstract: PIC finds a very low-dimensional embedding of a dataset
+using truncated power iteration on a normalized pair-wise similarity matrix of the data.
+
+`spark.ml`'s PowerIterationClustering implementation takes the following parameters:
+
+* `k`: the number of clusters to create
+* `initMode`: param for the initialization algorithm
+* `maxIter`: param for maximum number of iterations
+* `srcCol`: param for the name of the input column for source vertex IDs
+* `dstCol`: name of the input column for destination vertex IDs
+* `weightCol`: Param for weight column name
+
+**Examples**
+
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
+Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.clustering.PowerIterationClustering) for more details.
+
+{% include_example scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala %}
+</div>
+
+<div data-lang="java" markdown="1">
+Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/PowerIterationClustering.html) for more details.
+
+{% include_example java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java %}
+</div>
+
+<div data-lang="r" markdown="1">
+
+Refer to the [R API docs](api/R/spark.powerIterationClustering.html) for more details.
+
+{% include_example r/ml/powerIterationClustering.R %}
+</div>
+
+</div>
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 0057f05..dbb6124 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -544,6 +544,7 @@ SparkR supports the following machine learning algorithms currently:
 * [`spark.gaussianMixture`](api/R/spark.gaussianMixture.html): [`Gaussian Mixture Model (GMM)`](ml-clustering.html#gaussian-mixture-model-gmm)
 * [`spark.kmeans`](api/R/spark.kmeans.html): [`K-Means`](ml-clustering.html#k-means)
 * [`spark.lda`](api/R/spark.lda.html): [`Latent Dirichlet Allocation (LDA)`](ml-clustering.html#latent-dirichlet-allocation-lda)
+* [`spark.powerIterationClustering (PIC)`](api/R/spark.powerIterationClustering.html): [`Power Iteration Clustering (PIC)`](ml-clustering.html#power-iteration-clustering-pic)
 
 #### Collaborative Filtering
 
diff --git a/examples/src/main/r/ml/powerIterationClustering.R b/examples/src/main/r/ml/powerIterationClustering.R
new file mode 100644
index 0000000..ba43037
--- /dev/null
+++ b/examples/src/main/r/ml/powerIterationClustering.R
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# To run this example use
+# ./bin/spark-submit examples/src/main/r/ml/powerIterationClustering.R
+
+# Load SparkR library into your R session
+library(SparkR)
+
+# Initialize SparkSession
+sparkR.session(appName = "SparkR-ML-powerIterationCLustering-example")
+
+# $example on$
+df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
+                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
+                           list(4L, 0L, 0.1)),
+                      schema = c("src", "dst", "weight"))
+# assign clusters
+clusters <- spark.assignClusters(df, k=2L, maxIter=20L, initMode="degree", weightCol="weight")
+
+showDF(arrange(clusters, clusters$id))
+# $example off$
+
+sparkR.session.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
index 1b9a349..d9a330f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
@@ -97,8 +97,8 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
 /**
  * :: Experimental ::
  * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
- * <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
- * PIC finds a very low-dimensional embedding of a dataset using truncated power
+ * <a href=http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf>Lin and Cohen</a>. From
+ * the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power
  * iteration on a normalized pair-wise similarity matrix of the data.
  *
  * This class is not yet an Estimator/Transformer, use `assignClusters` method to run the
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala
new file mode 100644
index 0000000..b5dfad0
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/PowerIterationClusteringWrapper.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.spark.ml.clustering.PowerIterationClustering
+
+private[r] object PowerIterationClusteringWrapper {
+  def getPowerIterationClustering(
+      k: Int,
+      initMode: String,
+      maxIter: Int,
+      srcCol: String,
+      dstCol: String,
+      weightCol: String): PowerIterationClustering = {
+    val pic = new PowerIterationClustering()
+      .setK(k)
+      .setInitMode(initMode)
+      .setMaxIter(maxIter)
+      .setSrcCol(srcCol)
+      .setDstCol(dstCol)
+    if (weightCol != null) pic.setWeightCol(weightCol)
+    pic
+  }
+}
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index d0b507e..d8a6dfb 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -1193,8 +1193,8 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada
     .. note:: Experimental
 
     Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
-    `Lin and Cohen <http://www.icml2010.org/papers/387.pdf>`_. From the abstract:
-    PIC finds a very low-dimensional embedding of a dataset using truncated power
+    `Lin and Cohen <http://www.cs.cmu.edu/~frank/papers/icml2010-pic-final.pdf>`_. From the
+    abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power
     iteration on a normalized pair-wise similarity matrix of the data.
 
     This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org