You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/02/13 17:51:23 UTC

spark git commit: [SPARK-23217][ML] Add cosine distance measure to ClusteringEvaluator

Repository: spark
Updated Branches:
  refs/heads/master 05d051293 -> 4e0fb010c


[SPARK-23217][ML] Add cosine distance measure to ClusteringEvaluator

## What changes were proposed in this pull request?

The PR provided an implementation of ClusteringEvaluator using the cosine distance measure.
This allows to evaluate clustering results created using the cosine distance, introduced in SPARK-22119.

In the corresponding JIRA, there is a design document for the algorithm implemented here.

## How was this patch tested?

Added UT which compares the result to the one provided by python sklearn.

Author: Marco Gaido <ma...@gmail.com>

Closes #20396 from mgaido91/SPARK-23217.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e0fb010
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e0fb010
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e0fb010

Branch: refs/heads/master
Commit: 4e0fb010ccdf13fe411f2a4796bbadc385b01520
Parents: 05d0512
Author: Marco Gaido <ma...@gmail.com>
Authored: Tue Feb 13 11:51:19 2018 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 13 11:51:19 2018 -0600

----------------------------------------------------------------------
 .../ml/evaluation/ClusteringEvaluator.scala     | 334 +++++++++++++++----
 .../evaluation/ClusteringEvaluatorSuite.scala   |  32 +-
 2 files changed, 300 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e0fb010/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
index d6ec522..8d4ae56 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ml.evaluation
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT}
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
-import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable,
+  SchemaUtils}
+import org.apache.spark.sql.{Column, DataFrame, Dataset}
 import org.apache.spark.sql.functions.{avg, col, udf}
 import org.apache.spark.sql.types.DoubleType
 
@@ -32,15 +33,11 @@ import org.apache.spark.sql.types.DoubleType
  * :: Experimental ::
  *
  * Evaluator for clustering results.
- * The metric computes the Silhouette measure
- * using the squared Euclidean distance.
- *
- * The Silhouette is a measure for the validation
- * of the consistency within clusters. It ranges
- * between 1 and -1, where a value close to 1
- * means that the points in a cluster are close
- * to the other points in the same cluster and
- * far from the points of the other clusters.
+ * The metric computes the Silhouette measure using the specified distance measure.
+ *
+ * The Silhouette is a measure for the validation of the consistency within clusters. It ranges
+ * between 1 and -1, where a value close to 1 means that the points in a cluster are close to the
+ * other points in the same cluster and far from the points of the other clusters.
  */
 @Experimental
 @Since("2.3.0")
@@ -84,18 +81,40 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str
   @Since("2.3.0")
   def setMetricName(value: String): this.type = set(metricName, value)
 
-  setDefault(metricName -> "silhouette")
+  /**
+   * param for distance measure to be used in evaluation
+   * (supports `"squaredEuclidean"` (default), `"cosine"`)
+   * @group param
+   */
+  @Since("2.4.0")
+  val distanceMeasure: Param[String] = {
+    val availableValues = Array("squaredEuclidean", "cosine")
+    val allowedParams = ParamValidators.inArray(availableValues)
+    new Param(this, "distanceMeasure", "distance measure in evaluation. Supported options: " +
+      availableValues.mkString("'", "', '", "'"), allowedParams)
+  }
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getDistanceMeasure: String = $(distanceMeasure)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value)
+
+  setDefault(metricName -> "silhouette", distanceMeasure -> "squaredEuclidean")
 
   @Since("2.3.0")
   override def evaluate(dataset: Dataset[_]): Double = {
     SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
     SchemaUtils.checkNumericType(dataset.schema, $(predictionCol))
 
-    $(metricName) match {
-      case "silhouette" =>
+    ($(metricName), $(distanceMeasure)) match {
+      case ("silhouette", "squaredEuclidean") =>
         SquaredEuclideanSilhouette.computeSilhouetteScore(
-          dataset, $(predictionCol), $(featuresCol)
-      )
+          dataset, $(predictionCol), $(featuresCol))
+      case ("silhouette", "cosine") =>
+        CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), $(featuresCol))
     }
   }
 }
@@ -111,6 +130,48 @@ object ClusteringEvaluator
 }
 
 
+private[evaluation] abstract class Silhouette {
+
+  /**
+   * It computes the Silhouette coefficient for a point.
+   */
+  def pointSilhouetteCoefficient(
+      clusterIds: Set[Double],
+      pointClusterId: Double,
+      pointClusterNumOfPoints: Long,
+      averageDistanceToCluster: (Double) => Double): Double = {
+    // Here we compute the average dissimilarity of the current point to any cluster of which the
+    // point is not a member.
+    // The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current
+    // point - is said to be the "neighboring cluster".
+    val otherClusterIds = clusterIds.filter(_ != pointClusterId)
+    val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min
+
+    // adjustment for excluding the node itself from the computation of the average dissimilarity
+    val currentClusterDissimilarity = if (pointClusterNumOfPoints == 1) {
+      0.0
+    } else {
+      averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints /
+        (pointClusterNumOfPoints - 1)
+    }
+
+    if (currentClusterDissimilarity < neighboringClusterDissimilarity) {
+      1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
+    } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) {
+      (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1
+    } else {
+      0.0
+    }
+  }
+
+  /**
+   * Compute the mean Silhouette values of all samples.
+   */
+  def overallScore(df: DataFrame, scoreColumn: Column): Double = {
+    df.select(avg(scoreColumn)).collect()(0).getDouble(0)
+  }
+}
+
 /**
  * SquaredEuclideanSilhouette computes the average of the
  * Silhouette over all the data of the dataset, which is
@@ -259,7 +320,7 @@ object ClusteringEvaluator
  * `N` is the number of points in the dataset and `W` is the number
  * of worker nodes.
  */
-private[evaluation] object SquaredEuclideanSilhouette {
+private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
 
   private[this] var kryoRegistrationPerformed: Boolean = false
 
@@ -336,18 +397,19 @@ private[evaluation] object SquaredEuclideanSilhouette {
    * It computes the Silhouette coefficient for a point.
    *
    * @param broadcastedClustersMap A map of the precomputed values for each cluster.
-   * @param features The [[org.apache.spark.ml.linalg.Vector]] representing the current point.
+   * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point.
    * @param clusterId The id of the cluster the current point belongs to.
    * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point.
    * @return The Silhouette for the point.
    */
   def computeSilhouetteCoefficient(
      broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]],
-     features: Vector,
+     point: Vector,
      clusterId: Double,
      squaredNorm: Double): Double = {
 
-    def compute(squaredNorm: Double, point: Vector, clusterStats: ClusterStats): Double = {
+    def compute(targetClusterId: Double): Double = {
+      val clusterStats = broadcastedClustersMap.value(targetClusterId)
       val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum)
 
       squaredNorm +
@@ -355,41 +417,14 @@ private[evaluation] object SquaredEuclideanSilhouette {
         2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints
     }
 
-    // Here we compute the average dissimilarity of the
-    // current point to any cluster of which the point
-    // is not a member.
-    // The cluster with the lowest average dissimilarity
-    // - i.e. the nearest cluster to the current point -
-    // is said to be the "neighboring cluster".
-    var neighboringClusterDissimilarity = Double.MaxValue
-    broadcastedClustersMap.value.keySet.foreach {
-      c =>
-        if (c != clusterId) {
-          val dissimilarity = compute(squaredNorm, features, broadcastedClustersMap.value(c))
-          if(dissimilarity < neighboringClusterDissimilarity) {
-            neighboringClusterDissimilarity = dissimilarity
-          }
-        }
-    }
-    val currentCluster = broadcastedClustersMap.value(clusterId)
-    // adjustment for excluding the node itself from
-    // the computation of the average dissimilarity
-    val currentClusterDissimilarity = if (currentCluster.numOfPoints == 1) {
-      0
-    } else {
-      compute(squaredNorm, features, currentCluster) * currentCluster.numOfPoints /
-        (currentCluster.numOfPoints - 1)
-    }
-
-    (currentClusterDissimilarity compare neighboringClusterDissimilarity).signum match {
-      case -1 => 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
-      case 1 => (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1
-      case 0 => 0.0
-    }
+    pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
+      clusterId,
+      broadcastedClustersMap.value(clusterId).numOfPoints,
+      compute)
   }
 
   /**
-   * Compute the mean Silhouette values of all samples.
+   * Compute the Silhouette score of the dataset using squared Euclidean distance measure.
    *
    * @param dataset The input dataset (previously clustered) on which compute the Silhouette.
    * @param predictionCol The name of the column which contains the predicted cluster id
@@ -412,7 +447,7 @@ private[evaluation] object SquaredEuclideanSilhouette {
     val clustersStatsMap = SquaredEuclideanSilhouette
       .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol)
 
-    // Silhouette is reasonable only when the number of clusters is grater then 1
+    // Silhouette is reasonable only when the number of clusters is greater then 1
     assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")
 
     val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
@@ -421,13 +456,190 @@ private[evaluation] object SquaredEuclideanSilhouette {
       computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double)
     }
 
-    val silhouetteScore = dfWithSquaredNorm
-      .select(avg(
-        computeSilhouetteCoefficientUDF(
-          col(featuresCol), col(predictionCol).cast(DoubleType), col("squaredNorm"))
-      ))
-      .collect()(0)
-      .getDouble(0)
+    val silhouetteScore = overallScore(dfWithSquaredNorm,
+      computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType),
+        col("squaredNorm")))
+
+    bClustersStatsMap.destroy()
+
+    silhouetteScore
+  }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an efficient and parallel
+ * implementation of the Silhouette using the cosine distance measure. The cosine distance
+ * measure is defined as `1 - s` where `s` is the cosine similarity between two points.
+ *
+ * The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$`
+ * is:
+ *
+ * <blockquote>
+ *   $$
+ *   \sum\limits_{i=1}^N d(X, C_{i} ) =
+ *   \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big)
+ *   = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|}
+ *   \frac{c_{ij}}{\|C_{i}\|}
+ *   = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N
+ *   \frac{c_{ij}}{\|C_{i}\|} \Big)
+ *   $$
+ * </blockquote>
+ *
+ * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension
+ * of the `i`-th point in cluster `$\Gamma$`.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ *   $$
+ *   \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
+ *   $$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ *   $$
+ *   \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D
+ *   $$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ *   $$
+ *   N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
+ *   $$
+ * </blockquote>
+ *
+ * Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is:
+ *
+ * <blockquote>
+ *   $$
+ *   1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
+ *   $$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are distributed among the worker
+ * nodes via broadcasted variables, because we can assume that the clusters are limited in number.
+ *
+ * The main strengths of this algorithm are the low computational complexity and the intrinsic
+ * parallelism. The precomputed information for each point and for each cluster can be computed
+ * with a computational complexity which is `O(N/W)`, where `N` is the number of points in the
+ * dataset and `W` is the number of worker nodes. After that, every point can be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the clusters. Since the formula
+ * above requires `O(D)` operations, this phase has a computational complexity which is
+ * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number
+ * of dimensions, `N` is the number of points in the dataset and `W` is the number of worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+  private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+  /**
+   * The method takes the input dataset and computes the aggregated values
+   * about a cluster which are needed by the algorithm.
+   *
+   * @param df The DataFrame which contains the input data
+   * @param predictionCol The name of the column which contains the predicted cluster id
+   *                      for the point.
+   * @return A [[scala.collection.immutable.Map]] which associates each cluster id to a
+   *         its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`).
+   */
+  def computeClusterStats(df: DataFrame, predictionCol: String): Map[Double, (Vector, Long)] = {
+    val numFeatures = df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
+    val clustersStatsRDD = df.select(
+      col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
+      .rdd
+      .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
+      .aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
+      seqOp = {
+        case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) =>
+          BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
+          (normalizedFeaturesSum, numOfPoints + 1)
+      },
+      combOp = {
+        case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) =>
+          BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
+          (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
+      }
+    )
+
+    clustersStatsRDD
+      .collectAsMap()
+      .toMap
+  }
+
+  /**
+   * It computes the Silhouette coefficient for a point.
+   *
+   * @param broadcastedClustersMap A map of the precomputed values for each cluster.
+   * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the
+   *                           normalized features of the current point.
+   * @param clusterId The id of the cluster the current point belongs to.
+   */
+  def computeSilhouetteCoefficient(
+      broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]],
+      normalizedFeatures: Vector,
+      clusterId: Double): Double = {
+
+    def compute(targetClusterId: Double): Double = {
+      val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId)
+      1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints
+    }
+
+    pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
+      clusterId,
+      broadcastedClustersMap.value(clusterId)._2,
+      compute)
+  }
+
+  /**
+   * Compute the Silhouette score of the dataset using the cosine distance measure.
+   *
+   * @param dataset The input dataset (previously clustered) on which compute the Silhouette.
+   * @param predictionCol The name of the column which contains the predicted cluster id
+   *                      for the point.
+   * @param featuresCol The name of the column which contains the feature vector of the point.
+   * @return The average of the Silhouette values of the clustered data.
+   */
+  def computeSilhouetteScore(
+      dataset: Dataset[_],
+      predictionCol: String,
+      featuresCol: String): Double = {
+    val normalizeFeatureUDF = udf {
+      features: Vector => {
+        val norm = Vectors.norm(features, 2.0)
+        features match {
+          case d: DenseVector => Vectors.dense(d.values.map(_ / norm))
+          case s: SparseVector => Vectors.sparse(s.size, s.indices, s.values.map(_ / norm))
+        }
+      }
+    }
+    val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName,
+      normalizeFeatureUDF(col(featuresCol)))
+
+    // compute aggregate values for clusters needed by the algorithm
+    val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, predictionCol)
+
+    // Silhouette is reasonable only when the number of clusters is greater then 1
+    assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")
+
+    val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
+
+    val computeSilhouetteCoefficientUDF = udf {
+      computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double)
+    }
+
+    val silhouetteScore = overallScore(dfWithNormalizedFeatures,
+      computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName),
+        col(predictionCol).cast(DoubleType)))
 
     bClustersStatsMap.destroy()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0fb010/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
index 677ce49..3bf3477 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
@@ -66,16 +66,38 @@ class ClusteringEvaluatorSuite
     assert(evaluator.evaluate(irisDataset) ~== 0.6564679231 relTol 1e-5)
   }
 
-  test("number of clusters must be greater than one") {
-    val singleClusterDataset = irisDataset.where($"label" === 0.0)
+  /*
+    Use the following python code to load the data and evaluate it using scikit-learn package.
+
+    from sklearn import datasets
+    from sklearn.metrics import silhouette_score
+    iris = datasets.load_iris()
+    round(silhouette_score(iris.data, iris.target, metric='cosine'), 10)
+
+    0.7222369298
+  */
+  test("cosine Silhouette") {
     val evaluator = new ClusteringEvaluator()
       .setFeaturesCol("features")
       .setPredictionCol("label")
+      .setDistanceMeasure("cosine")
+
+    assert(evaluator.evaluate(irisDataset) ~== 0.7222369298 relTol 1e-5)
+  }
+
+  test("number of clusters must be greater than one") {
+    val singleClusterDataset = irisDataset.where($"label" === 0.0)
+    Seq("squaredEuclidean", "cosine").foreach { distanceMeasure =>
+      val evaluator = new ClusteringEvaluator()
+        .setFeaturesCol("features")
+        .setPredictionCol("label")
+        .setDistanceMeasure(distanceMeasure)
 
-    val e = intercept[AssertionError]{
-      evaluator.evaluate(singleClusterDataset)
+      val e = intercept[AssertionError] {
+        evaluator.evaluate(singleClusterDataset)
+      }
+      assert(e.getMessage.contains("Number of clusters must be greater than one"))
     }
-    assert(e.getMessage.contains("Number of clusters must be greater than one"))
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org