You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/06/29 07:38:09 UTC

spark git commit: [SPARK-5962] [MLLIB] Python support for Power Iteration Clustering

Repository: spark
Updated Branches:
  refs/heads/master 25f574eb9 -> dfde31da5


[SPARK-5962] [MLLIB] Python support for Power Iteration Clustering

Python support for Power Iteration Clustering
https://issues.apache.org/jira/browse/SPARK-5962

Author: Yanbo Liang <yb...@gmail.com>

Closes #6992 from yanboliang/pyspark-pic and squashes the following commits:

6b03d82 [Yanbo Liang] address comments
4be4423 [Yanbo Liang] Python support for Power Iteration Clustering


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfde31da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfde31da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfde31da

Branch: refs/heads/master
Commit: dfde31da5ce30e0d44cad4fb6618b44d5353d946
Parents: 25f574e
Author: Yanbo Liang <yb...@gmail.com>
Authored: Sun Jun 28 22:38:04 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Sun Jun 28 22:38:04 2015 -0700

----------------------------------------------------------------------
 .../PowerIterationClusteringModelWrapper.scala  | 32 +++++++
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 27 ++++++
 python/pyspark/mllib/clustering.py              | 98 +++++++++++++++++++-
 3 files changed, 154 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dfde31da/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala
new file mode 100644
index 0000000..bc6041b
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.clustering.PowerIterationClusteringModel
+
+/**
+ * A Wrapper of PowerIterationClusteringModel to provide helper method for Python
+ */
+private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel)
+  extends PowerIterationClusteringModel(model.k, model.assignments) {
+
+  def getAssignments: RDD[Array[Any]] = {
+    model.assignments.map(x => Array(x.id, x.cluster))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dfde31da/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index b16903a..a66a404 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -407,6 +407,33 @@ private[python] class PythonMLLibAPI extends Serializable {
   }
 
   /**
+   * Java stub for Python mllib PowerIterationClustering.run(). This stub returns a
+   * handle to the Java object instead of the content of the Java object.  Extra care
+   * needs to be taken in the Python code to ensure it gets freed on exit; see the
+   * Py4J documentation.
+   * @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix.
+   * @param k number of clusters.
+   * @param maxIterations maximum number of iterations of the power iteration loop.
+   * @param initMode the initialization mode. This can be either "random" to use
+   *                 a random vector as vertex properties, or "degree" to use
+   *                 normalized sum similarities. Default: random.
+   */
+  def trainPowerIterationClusteringModel(
+      data: JavaRDD[Vector],
+      k: Int,
+      maxIterations: Int,
+      initMode: String): PowerIterationClusteringModel = {
+
+    val pic = new PowerIterationClustering()
+      .setK(k)
+      .setMaxIterations(maxIterations)
+      .setInitializationMode(initMode)
+
+    val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2))))
+    new PowerIterationClusteringModelWrapper(model)
+  }
+
+  /**
    * Java stub for Python mllib ALS.train().  This stub returns a handle
    * to the Java object instead of the content of the Java object.  Extra care
    * needs to be taken in the Python code to ensure it gets freed on exit; see

http://git-wip-us.apache.org/repos/asf/spark/blob/dfde31da/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 8bc0654..e3c8a24 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -25,15 +25,18 @@ from math import exp, log
 
 from numpy import array, random, tile
 
+from collections import namedtuple
+
 from pyspark import SparkContext
 from pyspark.rdd import RDD, ignore_unicode_prefix
-from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
+from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
 from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
 from pyspark.mllib.stat.distribution import MultivariateGaussian
-from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
 from pyspark.streaming import DStream
 
 __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+           'PowerIterationClusteringModel', 'PowerIterationClustering',
            'StreamingKMeans', 'StreamingKMeansModel']
 
 
@@ -272,6 +275,94 @@ class GaussianMixture(object):
         return GaussianMixtureModel(weight, mvg_obj)
 
 
+class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
+
+    """
+    .. note:: Experimental
+
+    Model produced by [[PowerIterationClustering]].
+
+    >>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
+    ...     (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
+    >>> rdd = sc.parallelize(data, 2)
+    >>> model = PowerIterationClustering.train(rdd, 2, 100)
+    >>> model.k
+    2
+    >>> sorted(model.assignments().collect())
+    [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+    >>> import os, tempfile
+    >>> path = tempfile.mkdtemp()
+    >>> model.save(sc, path)
+    >>> sameModel = PowerIterationClusteringModel.load(sc, path)
+    >>> sameModel.k
+    2
+    >>> sorted(sameModel.assignments().collect())
+    [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+    >>> from shutil import rmtree
+    >>> try:
+    ...     rmtree(path)
+    ... except OSError:
+    ...     pass
+    """
+
+    @property
+    def k(self):
+        """
+        Returns the number of clusters.
+        """
+        return self.call("k")
+
+    def assignments(self):
+        """
+        Returns the cluster assignments of this model.
+        """
+        return self.call("getAssignments").map(
+            lambda x: (PowerIterationClustering.Assignment(*x)))
+
+    @classmethod
+    def load(cls, sc, path):
+        model = cls._load_java(sc, path)
+        wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
+        return PowerIterationClusteringModel(wrapper)
+
+
+class PowerIterationClustering(object):
+    """
+    .. note:: Experimental
+
+    Power Iteration Clustering (PIC), a scalable graph clustering algorithm
+    developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
+    From the abstract: PIC finds a very low-dimensional embedding of a
+    dataset using truncated power iteration on a normalized pair-wise
+    similarity matrix of the data.
+    """
+
+    @classmethod
+    def train(cls, rdd, k, maxIterations=100, initMode="random"):
+        """
+        :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
+            affinity matrix, which is the matrix A in the PIC paper.
+            The similarity s,,ij,, must be nonnegative.
+            This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
+            For any (i, j) with nonzero similarity, there should be
+            either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
+            Tuples with i = j are ignored, because we assume
+            s,,ij,, = 0.0.
+        :param k: Number of clusters.
+        :param maxIterations: Maximum number of iterations of the
+            PIC algorithm.
+        :param initMode: Initialization mode.
+        """
+        model = callMLlibFunc("trainPowerIterationClusteringModel",
+                              rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
+        return PowerIterationClusteringModel(model)
+
+    class Assignment(namedtuple("Assignment", ["id", "cluster"])):
+        """
+        Represents an (id, cluster) tuple.
+        """
+
+
 class StreamingKMeansModel(KMeansModel):
     """
     .. note:: Experimental
@@ -466,7 +557,8 @@ class StreamingKMeans(object):
 
 def _test():
     import doctest
-    globs = globals().copy()
+    import pyspark.mllib.clustering
+    globs = pyspark.mllib.clustering.__dict__.copy()
     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org