You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2016/01/19 19:16:00 UTC

spark git commit: [SPARK-11944][PYSPARK][MLLIB] python mllib.clustering.bisecting k means

Repository: spark
Updated Branches:
  refs/heads/master ebd9ce0f1 -> 0ddba6d88


[SPARK-11944][PYSPARK][MLLIB] python mllib.clustering.bisecting k means

>From the coverage issues for 1.6 : Add Python API for mllib.clustering.BisectingKMeans.

Author: Holden Karau <ho...@us.ibm.com>

Closes #10150 from holdenk/SPARK-11937-python-api-coverage-SPARK-11944-python-mllib.clustering.BisectingKMeans.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ddba6d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ddba6d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ddba6d8

Branch: refs/heads/master
Commit: 0ddba6d88ff093a96b4931f71bd0a599afbbca78
Parents: ebd9ce0
Author: Holden Karau <ho...@us.ibm.com>
Authored: Tue Jan 19 10:15:54 2016 -0800
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Tue Jan 19 10:15:54 2016 -0800

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  17 +++
 python/pyspark/mllib/clustering.py              | 136 ++++++++++++++++++-
 python/pyspark/mllib/tests.py                   |  11 ++
 3 files changed, 159 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ddba6d8/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 061db56..05f9a76 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -120,6 +120,23 @@ private[python] class PythonMLLibAPI extends Serializable {
   }
 
   /**
+   * Java stub for Python mllib BisectingKMeans.run()
+   */
+  def trainBisectingKMeans(
+      data: JavaRDD[Vector],
+      k: Int,
+      maxIterations: Int,
+      minDivisibleClusterSize: Double,
+      seed: Long): BisectingKMeansModel = {
+    new BisectingKMeans()
+      .setK(k)
+      .setMaxIterations(maxIterations)
+      .setMinDivisibleClusterSize(minDivisibleClusterSize)
+      .setSeed(seed)
+      .run(data)
+  }
+
+  /**
    * Java stub for Python mllib LinearRegressionWithSGD.train()
    */
   def trainLinearRegressionModelWithSGD(

http://git-wip-us.apache.org/repos/asf/spark/blob/0ddba6d8/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 580cb51..4e9eb96 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -38,13 +38,130 @@ from pyspark.mllib.stat.distribution import MultivariateGaussian
 from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
 from pyspark.streaming import DStream
 
-__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
-           'PowerIterationClusteringModel', 'PowerIterationClustering',
-           'StreamingKMeans', 'StreamingKMeansModel',
+__all__ = ['BisectingKMeansModel', 'BisectingKMeans', 'KMeansModel', 'KMeans',
+           'GaussianMixtureModel', 'GaussianMixture', 'PowerIterationClusteringModel',
+           'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel',
            'LDA', 'LDAModel']
 
 
 @inherit_doc
+class BisectingKMeansModel(JavaModelWrapper):
+    """
+    .. note:: Experimental
+
+    A clustering model derived from the bisecting k-means method.
+
+    >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
+    >>> bskm = BisectingKMeans()
+    >>> model = bskm.train(sc.parallelize(data, 2), k=4)
+    >>> p = array([0.0, 0.0])
+    >>> model.predict(p)
+    0
+    >>> model.k
+    4
+    >>> model.computeCost(p)
+    0.0
+
+    .. versionadded:: 2.0.0
+    """
+
+    def __init__(self, java_model):
+        super(BisectingKMeansModel, self).__init__(java_model)
+        self.centers = [c.toArray() for c in self.call("clusterCenters")]
+
+    @property
+    @since('2.0.0')
+    def clusterCenters(self):
+        """Get the cluster centers, represented as a list of NumPy
+        arrays."""
+        return self.centers
+
+    @property
+    @since('2.0.0')
+    def k(self):
+        """Get the number of clusters"""
+        return self.call("k")
+
+    @since('2.0.0')
+    def predict(self, x):
+        """
+        Find the cluster that each of the points belongs to in this
+        model.
+
+        :param x: the point (or RDD of points) to determine
+          compute the clusters for.
+        """
+        if isinstance(x, RDD):
+            vecs = x.map(_convert_to_vector)
+            return self.call("predict", vecs)
+
+        x = _convert_to_vector(x)
+        return self.call("predict", x)
+
+    @since('2.0.0')
+    def computeCost(self, x):
+        """
+        Return the Bisecting K-means cost (sum of squared distances of
+        points to their nearest center) for this model on the given
+        data. If provided with an RDD of points returns the sum.
+
+        :param point: the point or RDD of points to compute the cost(s).
+        """
+        if isinstance(x, RDD):
+            vecs = x.map(_convert_to_vector)
+            return self.call("computeCost", vecs)
+
+        return self.call("computeCost", _convert_to_vector(x))
+
+
+class BisectingKMeans(object):
+    """
+    .. note:: Experimental
+
+    A bisecting k-means algorithm based on the paper "A comparison of
+    document clustering techniques" by Steinbach, Karypis, and Kumar,
+    with modification to fit Spark.
+    The algorithm starts from a single cluster that contains all points.
+    Iteratively it finds divisible clusters on the bottom level and
+    bisects each of them using k-means, until there are `k` leaf
+    clusters in total or no leaf clusters are divisible.
+    The bisecting steps of clusters on the same level are grouped
+    together to increase parallelism. If bisecting all divisible
+    clusters on the bottom level would result more than `k` leaf
+    clusters, larger clusters get higher priority.
+
+    Based on
+    U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf}
+    Steinbach, Karypis, and Kumar, A comparison of document clustering
+    techniques, KDD Workshop on Text Mining, 2000.
+
+    .. versionadded:: 2.0.0
+    """
+
+    @since('2.0.0')
+    def train(self, rdd, k=4, maxIterations=20, minDivisibleClusterSize=1.0, seed=-1888008604):
+        """
+        Runs the bisecting k-means algorithm return the model.
+
+        :param rdd: input RDD to be trained on
+        :param k: The desired number of leaf clusters (default: 4).
+            The actual number could be smaller if there are no divisible
+            leaf clusters.
+        :param maxIterations: the max number of k-means iterations to
+            split clusters (default: 20)
+        :param minDivisibleClusterSize: the minimum number of points
+            (if >= 1.0) or the minimum proportion of points (if < 1.0)
+            of a divisible cluster (default: 1)
+        :param seed: a random seed (default: -1888008604 from
+            classOf[BisectingKMeans].getName.##)
+        """
+        java_model = callMLlibFunc(
+            "trainBisectingKMeans", rdd.map(_convert_to_vector),
+            k, maxIterations, minDivisibleClusterSize, seed)
+        return BisectingKMeansModel(java_model)
+
+
+@inherit_doc
 class KMeansModel(Saveable, Loader):
 
     """A clustering model derived from the k-means method.
@@ -118,7 +235,13 @@ class KMeansModel(Saveable, Loader):
 
     @since('0.9.0')
     def predict(self, x):
-        """Find the cluster to which x belongs in this model."""
+        """
+        Find the cluster that each of the points belongs to in this
+        model.
+
+        :param x: the point (or RDD of points) to determine
+            compute the clusters for.
+        """
         best = 0
         best_distance = float("inf")
         if isinstance(x, RDD):
@@ -136,7 +259,10 @@ class KMeansModel(Saveable, Loader):
     def computeCost(self, rdd):
         """
         Return the K-means cost (sum of squared distances of points to
-        their nearest center) for this model on the given data.
+        their nearest center) for this model on the given
+        data.
+
+        :param point: the RDD of points to compute the cost on.
         """
         cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
                              [_convert_to_vector(c) for c in self.centers])

http://git-wip-us.apache.org/repos/asf/spark/blob/0ddba6d8/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 3436a28..32ed48e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -419,6 +419,17 @@ class ListTests(MLlibTestCase):
     as NumPy arrays.
     """
 
+    def test_bisecting_kmeans(self):
+        from pyspark.mllib.clustering import BisectingKMeans
+        data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2)
+        bskm = BisectingKMeans()
+        model = bskm.train(sc.parallelize(data, 2), k=4)
+        p = array([0.0, 0.0])
+        rdd_p = self.sc.parallelize([p])
+        self.assertEqual(model.predict(p), model.predict(rdd_p).first())
+        self.assertEqual(model.computeCost(p), model.computeCost(rdd_p))
+        self.assertEqual(model.k, len(model.clusterCenters))
+
     def test_kmeans(self):
         from pyspark.mllib.clustering import KMeans
         data = [


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org