You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/12/02 04:40:56 UTC

spark git commit: [SPARK-4611][MLlib] Implement the efficient vector norm

Repository: spark
Updated Branches:
  refs/heads/master b0a46d899 -> 64f3175bf


[SPARK-4611][MLlib] Implement the efficient vector norm

The vector norm in breeze is implemented by `activeIterator` which is known to be very slow.
In this PR, an efficient vector norm is implemented, and with this API, `Normalizer` and
`k-means` have big performance improvement.

Here is the benchmark against mnist8m dataset.

a) `Normalizer`
Before
DenseVector: 68.25secs
SparseVector: 17.01secs

With this PR
DenseVector: 12.71secs
SparseVector: 2.73secs

b) `k-means`
Before
DenseVector: 83.46secs
SparseVector: 61.60secs

With this PR
DenseVector: 70.04secs
SparseVector: 59.05secs

Author: DB Tsai <db...@alpinenow.com>

Closes #3462 from dbtsai/norm and squashes the following commits:

63c7165 [DB Tsai] typo
0c3637f [DB Tsai] add import org.apache.spark.SparkContext._ back
6fa616c [DB Tsai] address feedback
9b7cb56 [DB Tsai] move norm to static method
0b632e6 [DB Tsai] kmeans
dbed124 [DB Tsai] style
c1a877c [DB Tsai] first commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64f3175b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64f3175b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64f3175b

Branch: refs/heads/master
Commit: 64f3175bf976f5a28e691cedc7a4b333709e0c58
Parents: b0a46d8
Author: DB Tsai <db...@alpinenow.com>
Authored: Tue Dec 2 11:40:43 2014 +0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue Dec 2 11:40:43 2014 +0800

----------------------------------------------------------------------
 .../apache/spark/mllib/clustering/KMeans.scala  |  6 +--
 .../apache/spark/mllib/feature/Normalizer.scala |  4 +-
 .../org/apache/spark/mllib/linalg/Vectors.scala | 51 ++++++++++++++++++++
 .../spark/mllib/linalg/VectorsSuite.scala       | 24 +++++++++
 4 files changed, 79 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64f3175b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 34ea0de..0f8dee5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.clustering
 
 import scala.collection.mutable.ArrayBuffer
 
-import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
+import breeze.linalg.{DenseVector => BDV, Vector => BV}
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.Logging
@@ -125,7 +125,7 @@ class KMeans private (
     }
 
     // Compute squared norms and cache them.
-    val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
+    val norms = data.map(Vectors.norm(_, 2.0))
     norms.persist()
     val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) =>
       new BreezeVectorWithNorm(v, norm)
@@ -425,7 +425,7 @@ object KMeans {
 private[clustering]
 class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {
 
-  def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0))
+  def this(vector: BV[Double]) = this(vector, Vectors.norm(Vectors.fromBreeze(vector), 2.0))
 
   def this(array: Array[Double]) = this(new BDV[Double](array))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64f3175b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
index a9c2e23..1ced26a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.mllib.feature
 
-import breeze.linalg.{norm => brzNorm}
-
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
 
@@ -47,7 +45,7 @@ class Normalizer(p: Double) extends VectorTransformer {
    * @return normalized vector. If the norm of the input is zero, it will return the input vector.
    */
   override def transform(vector: Vector): Vector = {
-    val norm = brzNorm(vector.toBreeze, p)
+    val norm = Vectors.norm(vector, p)
 
     if (norm != 0.0) {
       // For dense vector, we've to allocate new memory for new output vector.

http://git-wip-us.apache.org/repos/asf/spark/blob/64f3175b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index c6d5fe5..47d1a76 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -261,6 +261,57 @@ object Vectors {
         sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
     }
   }
+
+  /**
+   * Returns the p-norm of this vector.
+   * @param vector input vector.
+   * @param p norm.
+   * @return norm in L^p^ space.
+   */
+  private[spark] def norm(vector: Vector, p: Double): Double = {
+    require(p >= 1.0)
+    val values = vector match {
+      case dv: DenseVector => dv.values
+      case sv: SparseVector => sv.values
+      case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+    }
+    val size = values.size
+
+    if (p == 1) {
+      var sum = 0.0
+      var i = 0
+      while (i < size) {
+        sum += math.abs(values(i))
+        i += 1
+      }
+      sum
+    } else if (p == 2) {
+      var sum = 0.0
+      var i = 0
+      while (i < size) {
+        sum += values(i) * values(i)
+        i += 1
+      }
+      math.sqrt(sum)
+    } else if (p == Double.PositiveInfinity) {
+      var max = 0.0
+      var i = 0
+      while (i < size) {
+        val value = math.abs(values(i))
+        if (value > max) max = value
+        i += 1
+      }
+      max
+    } else {
+      var sum = 0.0
+      var i = 0
+      while (i < size) {
+        sum += math.pow(math.abs(values(i)), p)
+        i += 1
+      }
+      math.pow(sum, 1.0 / p)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/64f3175b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 9492f60..f99f014 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -21,6 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkException
+import org.apache.spark.mllib.util.TestingUtils._
 
 class VectorsSuite extends FunSuite {
 
@@ -197,4 +198,27 @@ class VectorsSuite extends FunSuite {
     assert(svMap.get(2) === Some(3.1))
     assert(svMap.get(3) === Some(0.0))
   }
+
+  test("vector p-norm") {
+    val dv = Vectors.dense(0.0, -1.2, 3.1, 0.0, -4.5, 1.9)
+    val sv = Vectors.sparse(6, Seq((1, -1.2), (2, 3.1), (3, 0.0), (4, -4.5), (5, 1.9)))
+
+    assert(Vectors.norm(dv, 1.0) ~== dv.toArray.foldLeft(0.0)((a, v) =>
+      a + math.abs(v)) relTol 1E-8)
+    assert(Vectors.norm(sv, 1.0) ~== sv.toArray.foldLeft(0.0)((a, v) =>
+      a + math.abs(v)) relTol 1E-8)
+
+    assert(Vectors.norm(dv, 2.0) ~== math.sqrt(dv.toArray.foldLeft(0.0)((a, v) =>
+      a + v * v)) relTol 1E-8)
+    assert(Vectors.norm(sv, 2.0) ~== math.sqrt(sv.toArray.foldLeft(0.0)((a, v) =>
+      a + v * v)) relTol 1E-8)
+
+    assert(Vectors.norm(dv, Double.PositiveInfinity) ~== dv.toArray.map(math.abs).max relTol 1E-8)
+    assert(Vectors.norm(sv, Double.PositiveInfinity) ~== sv.toArray.map(math.abs).max relTol 1E-8)
+
+    assert(Vectors.norm(dv, 3.7) ~== math.pow(dv.toArray.foldLeft(0.0)((a, v) =>
+      a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
+    assert(Vectors.norm(sv, 3.7) ~== math.pow(sv.toArray.foldLeft(0.0)((a, v) =>
+      a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org