You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/09/29 20:15:16 UTC

git commit: [MLlib] [SPARK-2885] DIMSUM: All-pairs similarity

Repository: spark
Updated Branches:
  refs/heads/master aedd251c5 -> 587a0cd7e


[MLlib] [SPARK-2885] DIMSUM: All-pairs similarity

# All-pairs similarity via DIMSUM
Compute all pairs of similar vectors using brute force approach, and also DIMSUM sampling approach.

Laying down some notation: we are looking for all pairs of similar columns in an m x n RowMatrix whose entries are denoted a_ij, with the i’th row denoted r_i and the j’th column denoted c_j. There is an oversampling parameter labeled ɣ that should be set to 4 log(n)/s to get provably correct results (with high probability), where s is the similarity threshold.

The algorithm is stated with a Map and Reduce, with proofs of correctness and efficiency in published papers [1] [2]. The reducer is simply the summation reducer. The mapper is more interesting, and is also the heart of the scheme. As an exercise, you should try to see why in expectation, the map-reduce below outputs cosine similarities.

![dimsumv2](https://cloud.githubusercontent.com/assets/3220351/3807272/d1d9514e-1c62-11e4-9f12-3cfdb1d78b3a.png)

[1] Bosagh-Zadeh, Reza and Carlsson, Gunnar (2013), Dimension Independent Matrix Square using MapReduce, arXiv:1304.1467 http://arxiv.org/abs/1304.1467

[2] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082 http://arxiv.org/abs/1206.2082

# Testing

Tests for all invocations included.

Added L1 and L2 norm computation to MultivariateStatisticalSummary since it was needed. Added tests for both of them.

Author: Reza Zadeh <ri...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #1778 from rezazadeh/dimsumv2 and squashes the following commits:

404c64c [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
4eb71c6 [Reza Zadeh] Add excludes for normL1 and normL2
ee8bd65 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
976ddd4 [Reza Zadeh] Broadcast colMags. Avoid div by zero.
3467cff [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
aea0247 [Reza Zadeh] Allow large thresholds to promote sparsity
9fe17c0 [Xiangrui Meng] organize imports
2196ba5 [Xiangrui Meng] Merge branch 'rezazadeh-dimsumv2' into dimsumv2
254ca08 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
f2947e4 [Xiangrui Meng] some optimization
3c4cf41 [Xiangrui Meng] Merge branch 'master' into rezazadeh-dimsumv2
0e4eda4 [Reza Zadeh] Use partition index for RNG
251bb9c [Reza Zadeh] Documentation
25e9d0d [Reza Zadeh] Line length for style
fb296f6 [Reza Zadeh] renamed to normL1 and normL2
3764983 [Reza Zadeh] Documentation
e9c6791 [Reza Zadeh] New interface and documentation
613f261 [Reza Zadeh] Column magnitude summary
75a0b51 [Reza Zadeh] Use Ints instead of Longs in the shuffle
0f12ade [Reza Zadeh] Style changes
eb1dc20 [Reza Zadeh] Use Double.PositiveInfinity instead of Double.Max
f56a882 [Reza Zadeh] Remove changes to MultivariateOnlineSummarizer
dbc55ba [Reza Zadeh] Make colMagnitudes a method in RowMatrix
41e8ece [Reza Zadeh] style changes
139c8e1 [Reza Zadeh] Syntax changes
029aa9c [Reza Zadeh] javadoc and new test
75edb25 [Reza Zadeh] All tests passing!
05e59b8 [Reza Zadeh] Add test
502ce52 [Reza Zadeh] new interface
654c4fb [Reza Zadeh] default methods
3726ca9 [Reza Zadeh] Remove MatrixAlgebra
6bebabb [Reza Zadeh] remove changes to MatrixSuite
5b8cd7d [Reza Zadeh] Initial files


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/587a0cd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/587a0cd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/587a0cd7

Branch: refs/heads/master
Commit: 587a0cd7ed964ebfca2c97924c4f1e363f1fd3cb
Parents: aedd251
Author: Reza Zadeh <ri...@gmail.com>
Authored: Mon Sep 29 11:15:09 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Sep 29 11:15:09 2014 -0700

----------------------------------------------------------------------
 .../mllib/linalg/distributed/RowMatrix.scala    | 171 ++++++++++++++++++-
 .../stat/MultivariateOnlineSummarizer.scala     |  38 ++++-
 .../stat/MultivariateStatisticalSummary.scala   |  10 ++
 .../linalg/distributed/RowMatrixSuite.scala     |  37 ++++
 project/MimaExcludes.scala                      |   9 +-
 5 files changed, 259 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/587a0cd7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 4174f45..8380058 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -19,17 +19,21 @@ package org.apache.spark.mllib.linalg.distributed
 
 import java.util.Arrays
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
-import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
+import scala.collection.mutable.ListBuffer
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy,
+  svd => brzSvd}
 import breeze.numerics.{sqrt => brzSqrt}
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
 
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.mllib.linalg._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
 import org.apache.spark.mllib.rdd.RDDFunctions._
 import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
 import org.apache.spark.storage.StorageLevel
 
 /**
@@ -411,6 +415,165 @@ class RowMatrix(
     new RowMatrix(AB, nRows, B.numCols)
   }
 
+  /**
+   * Compute all cosine similarities between columns of this matrix using the brute-force
+   * approach of computing normalized dot products.
+   *
+   * @return An n x n sparse upper-triangular matrix of cosine similarities between
+   *         columns of this matrix.
+   */
+  def columnSimilarities(): CoordinateMatrix = {
+    columnSimilarities(0.0)
+  }
+
+  /**
+   * Compute similarities between columns of this matrix using a sampling approach.
+   *
+   * The threshold parameter is a trade-off knob between estimate quality and computational cost.
+   *
+   * Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly
+   * the same cost as the brute-force approach. Setting the threshold to positive values
+   * incurs strictly less computational cost than the brute-force approach, however the
+   * similarities computed will be estimates.
+   *
+   * The sampling guarantees relative-error correctness for those pairs of columns that have
+   * similarity greater than the given similarity threshold.
+   *
+   * To describe the guarantee, we set some notation:
+   * Let A be the smallest in magnitude non-zero element of this matrix.
+   * Let B be the largest  in magnitude non-zero element of this matrix.
+   * Let L be the maximum number of non-zeros per row.
+   *
+   * For example, for {0,1} matrices: A=B=1.
+   * Another example, for the Netflix matrix: A=1, B=5
+   *
+   * For those column pairs that are above the threshold,
+   * the computed similarity is correct to within 20% relative error with probability
+   * at least 1 - (0.981)^10/B^
+   *
+   * The shuffle size is bounded by the *smaller* of the following two expressions:
+   *
+   * O(n log(n) L / (threshold * A))
+   * O(m L^2^)
+   *
+   * The latter is the cost of the brute-force approach, so for non-zero thresholds,
+   * the cost is always cheaper than the brute-force approach.
+   *
+   * @param threshold Set to 0 for deterministic guaranteed correctness.
+   *                  Similarities above this threshold are estimated
+   *                  with the cost vs estimate quality trade-off described above.
+   * @return An n x n sparse upper-triangular matrix of cosine similarities
+   *         between columns of this matrix.
+   */
+  def columnSimilarities(threshold: Double): CoordinateMatrix = {
+    require(threshold >= 0, s"Threshold cannot be negative: $threshold")
+
+    if (threshold > 1) {
+      logWarning(s"Threshold is greater than 1: $threshold " +
+      "Computation will be more efficient with promoted sparsity, " +
+      " however there is no correctness guarantee.")
+    }
+
+    val gamma = if (threshold < 1e-6) {
+      Double.PositiveInfinity
+    } else {
+      10 * math.log(numCols()) / threshold
+    }
+
+    columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
+  }
+
+  /**
+   * Find all similar columns using the DIMSUM sampling algorithm, described in two papers
+   *
+   * http://arxiv.org/abs/1206.2082
+   * http://arxiv.org/abs/1304.1467
+   *
+   * @param colMags A vector of column magnitudes
+   * @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s,
+   *              where s is the smallest similarity score to be estimated,
+   *              and n is the number of columns
+   * @return An n x n sparse upper-triangular matrix of cosine similarities
+   *         between columns of this matrix.
+   */
+  private[mllib] def columnSimilaritiesDIMSUM(
+      colMags: Array[Double],
+      gamma: Double): CoordinateMatrix = {
+    require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
+    require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")
+    val sg = math.sqrt(gamma) // sqrt(gamma) used many times
+
+    // Don't divide by zero for those columns with zero magnitude
+    val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)
+
+    val sc = rows.context
+    val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
+    val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))
+
+    val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
+      val p = pBV.value
+      val q = qBV.value
+
+      val rand = new XORShiftRandom(indx)
+      val scaled = new Array[Double](p.size)
+      iter.flatMap { row =>
+        val buf = new ListBuffer[((Int, Int), Double)]()
+        row match {
+          case sv: SparseVector =>
+            val nnz = sv.indices.size
+            var k = 0
+            while (k < nnz) {
+              scaled(k) = sv.values(k) / q(sv.indices(k))
+              k += 1
+            }
+            k = 0
+            while (k < nnz) {
+              val i = sv.indices(k)
+              val iVal = scaled(k)
+              if (iVal != 0 && rand.nextDouble() < p(i)) {
+                var l = k + 1
+                while (l < nnz) {
+                  val j = sv.indices(l)
+                  val jVal = scaled(l)
+                  if (jVal != 0 && rand.nextDouble() < p(j)) {
+                    buf += (((i, j), iVal * jVal))
+                  }
+                  l += 1
+                }
+              }
+              k += 1
+            }
+          case dv: DenseVector =>
+            val n = dv.values.size
+            var i = 0
+            while (i < n) {
+              scaled(i) = dv.values(i) / q(i)
+              i += 1
+            }
+            i = 0
+            while (i < n) {
+              val iVal = scaled(i)
+              if (iVal != 0 && rand.nextDouble() < p(i)) {
+                var j = i + 1
+                while (j < n) {
+                  val jVal = scaled(j)
+                  if (jVal != 0 && rand.nextDouble() < p(j)) {
+                    buf += (((i, j), iVal * jVal))
+                  }
+                  j += 1
+                }
+              }
+              i += 1
+            }
+        }
+        buf
+      }
+    }.reduceByKey(_ + _).map { case ((i, j), sim) =>
+      MatrixEntry(i.toLong, j.toLong, sim)
+    }
+    new CoordinateMatrix(sims, numCols(), numCols())
+  }
+
   private[mllib] override def toBreeze(): BDM[Double] = {
     val m = numRows().toInt
     val n = numCols().toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/587a0cd7/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 7d845c4..3025d48 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -42,6 +42,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
   private var n = 0
   private var currMean: BDV[Double] = _
   private var currM2n: BDV[Double] = _
+  private var currM2: BDV[Double] = _
+  private var currL1: BDV[Double] = _
   private var totalCnt: Long = 0
   private var nnz: BDV[Double] = _
   private var currMax: BDV[Double] = _
@@ -60,6 +62,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
 
       currMean = BDV.zeros[Double](n)
       currM2n = BDV.zeros[Double](n)
+      currM2 = BDV.zeros[Double](n)
+      currL1 = BDV.zeros[Double](n)
       nnz = BDV.zeros[Double](n)
       currMax = BDV.fill(n)(Double.MinValue)
       currMin = BDV.fill(n)(Double.MaxValue)
@@ -81,6 +85,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
         val tmpPrevMean = currMean(i)
         currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
         currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
+        currM2(i) += value * value
+        currL1(i) += math.abs(value)
 
         nnz(i) += 1.0
     }
@@ -97,7 +103,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    * @return This MultivariateOnlineSummarizer object.
    */
   def merge(other: MultivariateOnlineSummarizer): this.type = {
-   if (this.totalCnt != 0 && other.totalCnt != 0) {
+    if (this.totalCnt != 0 && other.totalCnt != 0) {
       require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
         s"Expecting $n but got ${other.n}.")
       totalCnt += other.totalCnt
@@ -114,6 +120,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
           currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
             (nnz(i) + other.nnz(i))
         }
+        // merge m2 together
+        if (nnz(i) + other.nnz(i) != 0.0) {
+          currM2(i) += other.currM2(i)
+        }
+        // merge l1 together
+        if (nnz(i) + other.nnz(i) != 0.0) {
+          currL1(i) += other.currL1(i)
+        }
+
         if (currMax(i) < other.currMax(i)) {
           currMax(i) = other.currMax(i)
         }
@@ -127,6 +142,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
       this.n = other.n
       this.currMean = other.currMean.copy
       this.currM2n = other.currM2n.copy
+      this.currM2 = other.currM2.copy
+      this.currL1 = other.currL1.copy
       this.totalCnt = other.totalCnt
       this.nnz = other.nnz.copy
       this.currMax = other.currMax.copy
@@ -198,4 +215,23 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
     }
     Vectors.fromBreeze(currMin)
   }
+
+  override def normL2: Vector = {
+    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+
+    val realMagnitude = BDV.zeros[Double](n)
+
+    var i = 0
+    while (i < currM2.size) {
+      realMagnitude(i) = math.sqrt(currM2(i))
+      i += 1
+    }
+
+    Vectors.fromBreeze(realMagnitude)
+  }
+
+  override def normL1: Vector = {
+    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    Vectors.fromBreeze(currL1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/587a0cd7/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
index f9eb343..6a364c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
@@ -53,4 +53,14 @@ trait MultivariateStatisticalSummary {
    * Minimum value of each column.
    */
   def min: Vector
+
+  /**
+   * Euclidean magnitude of each column
+   */
+  def normL2: Vector
+
+  /**
+   * L1 norm of each column
+   */
+  def normL1: Vector
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/587a0cd7/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 1d3a322..63f3ed5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -95,6 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("similar columns") {
+    val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94))
+    val expected = BDM(
+      (0.0, 54.0, 72.0),
+      (0.0, 0.0, 78.0),
+      (0.0, 0.0, 0.0))
+
+    for (i <- 0 until n; j <- 0 until n) {
+      expected(i, j) /= (colMags(i) * colMags(j))
+    }
+
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val G = mat.columnSimilarities(0.11).toBreeze()
+      for (i <- 0 until n; j <- 0 until n) {
+        if (expected(i, j) > 0) {
+          val actual = expected(i, j)
+          val estimate = G(i, j)
+          assert(math.abs(actual - estimate) / actual < 0.2,
+            s"Similarities not close enough: $actual vs $estimate")
+        }
+      }
+    }
+
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val G = mat.columnSimilarities()
+      assert(closeToZero(G.toBreeze() - expected))
+    }
+
+    for (mat <- Seq(denseMat, sparseMat)) {
+      val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0)
+      assert(closeToZero(G.toBreeze() - expected))
+    }
+  }
+
   test("svd of a full-rank matrix") {
     for (mat <- Seq(denseMat, sparseMat)) {
       for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
@@ -190,6 +224,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
         assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch")
         assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch")
         assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.")
+        assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)),
+          "magnitude mismatch.")
+        assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/587a0cd7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3280e66..1adfaa1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -39,7 +39,14 @@ object MimaExcludes {
             MimaBuild.excludeSparkPackage("graphx")
           ) ++
           MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
-          MimaBuild.excludeSparkClass("mllib.linalg.Vector")
+          MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
+          Seq(
+            // Added normL1 and normL2 to trait MultivariateStatisticalSummary
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
+          )
 
         case v if v.startsWith("1.1") =>
           Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org