You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:09:54 UTC

[27/50] [abbrv] mahout git commit: MAHOUT-1824: Optimize FlinkOpAtA to use upper triangular matrices. closes apache/mahout#211

MAHOUT-1824: Optimize FlinkOpAtA to use upper triangular matrices. closes apache/mahout#211


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4fc65d4e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4fc65d4e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4fc65d4e

Branch: refs/heads/master
Commit: 4fc65d4e26957cfef68eb30e0bf712758e21a5a1
Parents: 2861732
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Apr 8 04:03:10 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Apr 8 04:03:10 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/blas/FlinkOpAtA.scala  |  74 +++++++--
 .../flinkbindings/FailingTestsSuite.scala       | 164 +++++++++----------
 2 files changed, 146 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index bdb0e5e..b9fbc63 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -3,19 +3,27 @@ package org.apache.mahout.flinkbindings.blas
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.shaded.com.google.common.collect.Lists
 import org.apache.flink.util.Collector
+
+import org.apache.mahout.math.{Matrix, UpperTriangular}
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
+
+import org.apache.mahout.math._
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import collection._
+import JavaConversions._
+import org.apache.mahout.math.drm.logical.OpAtA
+
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 /**
  * Inspired by Spark's implementation from 
@@ -45,14 +53,60 @@ object FlinkOpAtA {
   }
 
   def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
-    val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]]
+    val ds = A.asRowWise.ds
+    val ncol = op.ncol
+
+    // Compute backing vector of tiny-upper-triangular accumulator across all the data.
+    val res = ds.mapPartition(pIter => {
+
+      val ut = new UpperTriangular(ncol)
+
+      // Strategy is to add to an outer product of each row to the upper triangular accumulator.
+      pIter.foreach({ case (k, v) =>
+
+        // Use slightly various traversal strategies over dense vs. sparse source.
+        if (v.isDense) {
+
+          // Update upper-triangular pattern only (due to symmetry).
+          // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
+          // such spectacular case they were designed for.. Yes I do observe some 20% difference
+          // compared to while loops with no other payload, but the other paylxcoad is usually much
+          // heavier than this overhead, so... I am keeping this as is for the time being.
+
+          for (row <- 0 until v.length; col <- row until v.length)
+            ut(row, col) = ut(row, col) + v(row) * v(col)
+
+        } else {
+
+          // Sparse source.
+          v.nonZeroes().view
+
+            // Outer iterator iterates over rows of outer product.
+            .foreach(elrow => {
+
+            // Inner loop for columns of outer product.
+            v.nonZeroes().view
+
+              // Filter out non-upper nonzero elements from the double loop.
+              .filter(_.index >= elrow.index)
+
+              // Incrementally update outer product value in the uppper triangular accumulator.
+              .foreach(elcol => {
+
+              val row = elrow.index
+              val col = elcol.index
+              ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
+
+            })
+          })
+
+        }
+      })
 
-    val res = ds.map {
-      // TODO: optimize it: use upper-triangle matrices like in Spark
-      block => block._2.t %*% block._2
-    }.reduce(_ + _).collect()
+      Iterator(dvec(ddata = ut.getData).asInstanceOf[Vector]: Vector)
+    }).reduce(_ + _).collect()
 
-    res.head
+    new DenseSymmetricMatrix(res.head)
   }
 
   def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
index b834912..8186e2d 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
@@ -78,41 +78,41 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher
 //  }
 
 
-  test("C = A + B, identically partitioned") {
-
-    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2)
-
-     //   printf("A.nrow=%d.\n", A.rdd.count())
-
-    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
-    val B = A.mapBlock() {
-      case (keys, block) =>
-        val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
-        keys -> bBlock
-    }
-      // Prevent repeated computation non-determinism
-      // flink problem is here... checkpoint is not doing what it should
-      // ie. greate a physical plan w/o side effects
-      .checkpoint()
-
-    val inCoreB = B.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("B=\n%s\n", inCoreB)
-
-    val C = A + B
-
-    val inCoreC = C.collect
-
-    printf("C=\n%s\n", inCoreC)
-
-    // Actual
-    val inCoreCControl = inCoreA + inCoreB
-
-    (inCoreC - inCoreCControl).norm should be < 1E-10
-  }
+//  test("C = A + B, identically partitioned") {
+//
+//    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+//
+//    val A = drmParallelize(inCoreA, numPartitions = 2)
+//
+//     //   printf("A.nrow=%d.\n", A.rdd.count())
+//
+//    // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+//    val B = A.mapBlock() {
+//      case (keys, block) =>
+//        val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+//        keys -> bBlock
+//    }
+//      // Prevent repeated computation non-determinism
+//      // flink problem is here... checkpoint is not doing what it should
+//      // ie. greate a physical plan w/o side effects
+//      .checkpoint()
+//
+//    val inCoreB = B.collect
+//
+//    printf("A=\n%s\n", inCoreA)
+//    printf("B=\n%s\n", inCoreB)
+//
+//    val C = A + B
+//
+//    val inCoreC = C.collect
+//
+//    printf("C=\n%s\n", inCoreC)
+//
+//    // Actual
+//    val inCoreCControl = inCoreA + inCoreB
+//
+//    (inCoreC - inCoreCControl).norm should be < 1E-10
+//  }
 //// Passing now.
 //  test("C = inCoreA %*%: B") {
 //
@@ -183,53 +183,53 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher
 
 
 
-  test("dspca") {
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m = 500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-    val drmInput = drmParallelize(m = input, numPartitions = 2)
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
-    // specific additions.
-    val k = 10
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
-    // Un-normalized pca data:
-    drmPCA = drmPCA %*% diagv(s)
-
-    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
-    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
-  }
+//  test("dspca") {
+//
+//    val rnd = RandomUtils.getRandom
+//
+//    // Number of points
+//    val m = 500
+//    // Length of actual spectrum
+//    val spectrumLen = 40
+//
+//    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+//    printf("spectrum:%s\n", spectrum)
+//
+//    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+//      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+//
+//    // PCA Rotation matrix -- should also be orthonormal.
+//    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+//
+//    val input = (u %*%: diagv(spectrum)) %*% tr.t
+//    val drmInput = drmParallelize(m = input, numPartitions = 2)
+//
+//    // Calculate just first 10 principal factors and reduce dimensionality.
+//    // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+//    // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+//    // specific additions.
+//    val k = 10
+//
+//    // Calculate just first 10 principal factors and reduce dimensionality.
+//    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
+//    // Un-normalized pca data:
+//    drmPCA = drmPCA %*% diagv(s)
+//
+//    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+//
+//    // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+//    // generated input was not centered. So here, we'd just brute-solve pca to verify
+//    val xi = input.colMeans()
+//    for (r <- 0 until input.nrow) input(r, ::) -= xi
+//    var (pcaControl, _, sControl) = svd(m = input)
+//    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+//
+//    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+//    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+//
+//    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+//
+//  }
 
   test("dals") {