You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:09:54 UTC
[27/50] [abbrv] mahout git commit: MAHOUT-1824: Optimize FlinkOpAtA
to use upper triangular matrices. closes apache/mahout#211
MAHOUT-1824: Optimize FlinkOpAtA to use upper triangular matrices. closes apache/mahout#211
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4fc65d4e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4fc65d4e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4fc65d4e
Branch: refs/heads/master
Commit: 4fc65d4e26957cfef68eb30e0bf712758e21a5a1
Parents: 2861732
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Apr 8 04:03:10 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Apr 8 04:03:10 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/blas/FlinkOpAtA.scala | 74 +++++++--
.../flinkbindings/FailingTestsSuite.scala | 164 +++++++++----------
2 files changed, 146 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index bdb0e5e..b9fbc63 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -3,19 +3,27 @@ package org.apache.mahout.flinkbindings.blas
import java.lang.Iterable
import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.com.google.common.collect.Lists
import org.apache.flink.util.Collector
+
+import org.apache.mahout.math.{Matrix, UpperTriangular}
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
+
+import org.apache.mahout.math._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import collection._
+import JavaConversions._
+import org.apache.mahout.math.drm.logical.OpAtA
+
import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
/**
* Inspired by Spark's implementation from
@@ -45,14 +53,60 @@ object FlinkOpAtA {
}
def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
- val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]]
+ val ds = A.asRowWise.ds
+ val ncol = op.ncol
+
+ // Compute backing vector of tiny-upper-triangular accumulator across all the data.
+ val res = ds.mapPartition(pIter => {
+
+ val ut = new UpperTriangular(ncol)
+
+ // Strategy is to add to an outer product of each row to the upper triangular accumulator.
+ pIter.foreach({ case (k, v) =>
+
+ // Use slightly various traversal strategies over dense vs. sparse source.
+ if (v.isDense) {
+
+ // Update upper-triangular pattern only (due to symmetry).
+ // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
+ // such spectacular case they were designed for.. Yes I do observe some 20% difference
+ // compared to while loops with no other payload, but the other paylxcoad is usually much
+ // heavier than this overhead, so... I am keeping this as is for the time being.
+
+ for (row <- 0 until v.length; col <- row until v.length)
+ ut(row, col) = ut(row, col) + v(row) * v(col)
+
+ } else {
+
+ // Sparse source.
+ v.nonZeroes().view
+
+ // Outer iterator iterates over rows of outer product.
+ .foreach(elrow => {
+
+ // Inner loop for columns of outer product.
+ v.nonZeroes().view
+
+ // Filter out non-upper nonzero elements from the double loop.
+ .filter(_.index >= elrow.index)
+
+ // Incrementally update outer product value in the uppper triangular accumulator.
+ .foreach(elcol => {
+
+ val row = elrow.index
+ val col = elcol.index
+ ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
+
+ })
+ })
+
+ }
+ })
- val res = ds.map {
- // TODO: optimize it: use upper-triangle matrices like in Spark
- block => block._2.t %*% block._2
- }.reduce(_ + _).collect()
+ Iterator(dvec(ddata = ut.getData).asInstanceOf[Vector]: Vector)
+ }).reduce(_ + _).collect()
- res.head
+ new DenseSymmetricMatrix(res.head)
}
def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
index b834912..8186e2d 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
@@ -78,41 +78,41 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher
// }
- test("C = A + B, identically partitioned") {
-
- val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
-
- val A = drmParallelize(inCoreA, numPartitions = 2)
-
- // printf("A.nrow=%d.\n", A.rdd.count())
-
- // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
- val B = A.mapBlock() {
- case (keys, block) =>
- val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
- keys -> bBlock
- }
- // Prevent repeated computation non-determinism
- // flink problem is here... checkpoint is not doing what it should
- // ie. greate a physical plan w/o side effects
- .checkpoint()
-
- val inCoreB = B.collect
-
- printf("A=\n%s\n", inCoreA)
- printf("B=\n%s\n", inCoreB)
-
- val C = A + B
-
- val inCoreC = C.collect
-
- printf("C=\n%s\n", inCoreC)
-
- // Actual
- val inCoreCControl = inCoreA + inCoreB
-
- (inCoreC - inCoreCControl).norm should be < 1E-10
- }
+// test("C = A + B, identically partitioned") {
+//
+// val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+//
+// val A = drmParallelize(inCoreA, numPartitions = 2)
+//
+// // printf("A.nrow=%d.\n", A.rdd.count())
+//
+// // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
+// val B = A.mapBlock() {
+// case (keys, block) =>
+// val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+// keys -> bBlock
+// }
+// // Prevent repeated computation non-determinism
+// // flink problem is here... checkpoint is not doing what it should
+// // ie. greate a physical plan w/o side effects
+// .checkpoint()
+//
+// val inCoreB = B.collect
+//
+// printf("A=\n%s\n", inCoreA)
+// printf("B=\n%s\n", inCoreB)
+//
+// val C = A + B
+//
+// val inCoreC = C.collect
+//
+// printf("C=\n%s\n", inCoreC)
+//
+// // Actual
+// val inCoreCControl = inCoreA + inCoreB
+//
+// (inCoreC - inCoreCControl).norm should be < 1E-10
+// }
//// Passing now.
// test("C = inCoreA %*%: B") {
//
@@ -183,53 +183,53 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher
- test("dspca") {
-
- val rnd = RandomUtils.getRandom
-
- // Number of points
- val m = 500
- // Length of actual spectrum
- val spectrumLen = 40
-
- val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
- printf("spectrum:%s\n", spectrum)
-
- val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
- ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
- // PCA Rotation matrix -- should also be orthonormal.
- val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
-
- val input = (u %*%: diagv(spectrum)) %*% tr.t
- val drmInput = drmParallelize(m = input, numPartitions = 2)
-
- // Calculate just first 10 principal factors and reduce dimensionality.
- // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
- // ensure to zero stochastic error and assert only functional correctness of the method's pca-
- // specific additions.
- val k = 10
-
- // Calculate just first 10 principal factors and reduce dimensionality.
- var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
- // Un-normalized pca data:
- drmPCA = drmPCA %*% diagv(s)
-
- val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
- // Of course, once we calculated the pca, the spectrum is going to be different since our originally
- // generated input was not centered. So here, we'd just brute-solve pca to verify
- val xi = input.colMeans()
- for (r <- 0 until input.nrow) input(r, ::) -= xi
- var (pcaControl, _, sControl) = svd(m = input)
- pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
- printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
- printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
- (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
-
- }
+// test("dspca") {
+//
+// val rnd = RandomUtils.getRandom
+//
+// // Number of points
+// val m = 500
+// // Length of actual spectrum
+// val spectrumLen = 40
+//
+// val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
+// printf("spectrum:%s\n", spectrum)
+//
+// val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+// ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+//
+// // PCA Rotation matrix -- should also be orthonormal.
+// val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0)
+//
+// val input = (u %*%: diagv(spectrum)) %*% tr.t
+// val drmInput = drmParallelize(m = input, numPartitions = 2)
+//
+// // Calculate just first 10 principal factors and reduce dimensionality.
+// // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to
+// // ensure to zero stochastic error and assert only functional correctness of the method's pca-
+// // specific additions.
+// val k = 10
+//
+// // Calculate just first 10 principal factors and reduce dimensionality.
+// var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
+// // Un-normalized pca data:
+// drmPCA = drmPCA %*% diagv(s)
+//
+// val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+//
+// // Of course, once we calculated the pca, the spectrum is going to be different since our originally
+// // generated input was not centered. So here, we'd just brute-solve pca to verify
+// val xi = input.colMeans()
+// for (r <- 0 until input.nrow) input(r, ::) -= xi
+// var (pcaControl, _, sControl) = svd(m = input)
+// pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+//
+// printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+// printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+//
+// (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5
+//
+// }
test("dals") {