You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/06/13 02:45:28 UTC
[6/6] git commit: MAHOUT-1464 Cooccurrence Analysis on Spark (pat)
closes apache/mahout#12
MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c1ca3087
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c1ca3087
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c1ca3087
Branch: refs/heads/master
Commit: c1ca30872c622e513e49fc1bb111bc4b8a527d3b
Parents: b77caec
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Jun 12 17:45:09 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Jun 12 17:45:09 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../mahout/math/drm/CheckpointedOps.scala | 3 +
.../mahout/math/drm/DistributedEngine.scala | 3 +
.../mahout/math/scalabindings/MatrixOps.scala | 6 +-
.../math/scalabindings/MatrixOpsSuite.scala | 3 +-
.../java/org/apache/mahout/math/MurmurHash.java | 13 +-
.../apache/mahout/cf/CooccurrenceAnalysis.scala | 213 +++++++++++++++++++
.../mahout/sparkbindings/SparkEngine.scala | 35 ++-
.../apache/mahout/sparkbindings/package.scala | 4 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 195 +++++++++++++++++
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 11 +
11 files changed, 477 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7111122..1d5bd3d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1464: Cooccurrence Analysis on Spark (pat)
+
MAHOUT-1578: Optimizations in matrix serialization (ssc)
MAHOUT-1572: blockify() to detect (naively) the data sparsity in the loaded data (dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index edd0cfc..8c3911f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -32,6 +32,9 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
/** Column sums. At this point this runs on checkpoint and collects in-core vector. */
def colSums(): Vector = drm.context.colSums(drm)
+ /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */
+ def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm)
+
/** Column Means */
def colMeans(): Vector = drm.context.colMeans(drm)
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 5ffee9d..f136981 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -46,6 +46,9 @@ trait DistributedEngine {
/** Engine-specific colSums implementation based on a checkpoint. */
def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+ /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
+ def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector
+
/** Engine-specific colMeans implementation based on a checkpoint. */
def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index 48c2048..149feca 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -176,7 +176,7 @@ class MatrixOps(val m: Matrix) {
def rowMeans() = if (m.ncol == 0) rowSums() else rowSums() /= m.ncol
-
+ def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountFunc)
}
object MatrixOps {
@@ -188,4 +188,8 @@ object MatrixOps {
def apply(f: Vector): Double = f.sum
}
+ private def vectorCountFunc = new VectorFunction {
+ def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index e57c75c..d59d3a5 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -109,7 +109,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
println(a.toString)
}
- test("colSums, rowSums, colMeans, rowMeans") {
+ test("colSums, rowSums, colMeans, rowMeans, numNonZeroElementsPerColumn") {
val a = dense(
(2, 3, 4),
(3, 4, 5)
@@ -119,6 +119,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
a.rowSums() should equal(dvec(9, 12))
a.colMeans() should equal(dvec(2.5, 3.5, 4.5))
a.rowMeans() should equal(dvec(3, 4))
+ a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2))
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/math/src/main/java/org/apache/mahout/math/MurmurHash.java
----------------------------------------------------------------------
diff --git a/math/src/main/java/org/apache/mahout/math/MurmurHash.java b/math/src/main/java/org/apache/mahout/math/MurmurHash.java
index 0b3fab0..32dfdd6 100644
--- a/math/src/main/java/org/apache/mahout/math/MurmurHash.java
+++ b/math/src/main/java/org/apache/mahout/math/MurmurHash.java
@@ -17,6 +17,8 @@
package org.apache.mahout.math;
+import com.google.common.primitives.Ints;
+
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -29,7 +31,16 @@ import java.nio.ByteOrder;
*/
public final class MurmurHash {
- private MurmurHash() {
+ private MurmurHash() {}
+
+ /**
+ * Hashes an int.
+ * @param data The int to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit hash of the bytes in question.
+ */
+ public static int hash(int data, int seed) {
+ return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
}
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..ee44f90
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+ /** Compares (Int,Double) pairs by the second value */
+ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+ implicit val distributedContext = drmARaw.context
+
+ // Apply selective downsampling, pin resulting matrix
+ val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+ // num users, which equals the maximum number of interactions per item
+ val numUsers = drmA.nrow.toInt
+
+ // Compute & broadcast the number of interactions per thing in A
+ val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+ // Compute co-occurrence matrix A'A
+ val drmAtA = drmA.t %*% drmA
+
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+ val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+ bcastInteractionsPerItemA, crossCooccurrence = false)
+
+ var indicatorMatrices = List(drmIndicatorsAtA)
+
+ // Now look at cross-co-occurrences
+ for (drmBRaw <- drmBs) {
+ // Down-sample and pin other interaction matrix
+ val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+ // Compute & broadcast the number of interactions per thing in B
+ val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+ // Compute cross-co-occurrence matrix B'A
+ val drmBtA = drmB.t %*% drmA
+
+ val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing,
+ bcastInteractionsPerThingB, bcastInteractionsPerItemA)
+
+ indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
+
+ drmB.uncache()
+ }
+
+ // Unpin downsampled interaction matrix
+ drmA.uncache()
+
+ // Return list of indicator matrices
+ indicatorMatrices
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+ **/
+ def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+ numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+ val k11 = numInteractionsWithAandB
+ val k12 = numInteractionsWithA - numInteractionsWithAandB
+ val k21 = numInteractionsWithB - numInteractionsWithAandB
+ val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+ LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+ }
+
+ def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+ bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+ crossCooccurrence: Boolean = true) = {
+ drmBtA.mapBlock() {
+ case (keys, block) =>
+
+ val llrBlock = block.like()
+ val numInteractionsB: Vector = bcastNumInteractionsB
+ val numInteractionsA: Vector = bcastNumInteractionsA
+
+ for (index <- 0 until keys.size) {
+
+ val thingB = keys(index)
+
+ // PriorityQueue to select the top-k items
+ val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+ block(index, ::).nonZeroes().foreach { elem =>
+ val thingA = elem.index
+ val cooccurrences = elem.get
+
+ // exclude co-occurrences of the item with itself
+ if (crossCooccurrence || thingB != thingA) {
+ // Compute loglikelihood ratio
+ val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+ cooccurrences.toLong, numUsers)
+ val candidate = thingA -> llrRatio
+
+ // Enqueue item with score, if belonging to the top-k
+ if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+ topItemsPerThing.enqueue(candidate)
+ } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+ topItemsPerThing.dequeue()
+ topItemsPerThing.enqueue(candidate)
+ }
+ }
+ }
+
+ // Add top-k interesting items to the output matrix
+ topItemsPerThing.dequeueAll.foreach {
+ case (otherThing, llrScore) =>
+ llrBlock(index, otherThing) = llrScore
+ }
+ }
+
+ keys -> llrBlock
+ }
+ }
+
+ /**
+ * Selectively downsample users and things with an anomalous amount of interactions, inspired by
+ * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+ *
+ * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+ */
+ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+ implicit val distributedContext = drmM.context
+
+ // Pin raw interaction matrix
+ val drmI = drmM.checkpoint()
+
+ // Broadcast vector containing the number of interactions with each thing
+ val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+ val downSampledDrmI = drmI.mapBlock() {
+ case (keys, block) =>
+ val numInteractions: Vector = bcastNumInteractions
+
+ // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+ val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+ val downsampledBlock = block.like()
+
+ // Downsample the interaction vector of each user
+ for (userIndex <- 0 until keys.size) {
+
+ val interactionsOfUser = block(userIndex, ::)
+
+ val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
+
+ val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
+
+ interactionsOfUser.nonZeroes().foreach { elem =>
+ val numInteractionsWithThing = numInteractions(elem.index)
+ val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+ if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
+ // We ignore the original interaction value and create a binary 0-1 matrix
+ // as we only consider whether interactions happened or did not happen
+ downsampledBlock(userIndex, elem.index) = 1
+ }
+ }
+ }
+
+ keys -> downsampledBlock
+ }
+
+ // Unpin raw interaction matrix
+ drmI.uncache()
+
+ downSampledDrmI
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 3a03e58..7a1fb2d 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -40,16 +40,37 @@ object SparkEngine extends DistributedEngine {
val n = drm.ncol
drm.rdd
- // Throw away keys
- .map(_._2)
- // Fold() doesn't work with kryo still. So work around it.
- .mapPartitions(iter => {
+ // Throw away keys
+ .map(_._2)
+ // Fold() doesn't work with kryo still. So work around it.
+ .mapPartitions(iter => {
val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
Iterator(acc)
})
- // Since we preallocated new accumulator vector per partition, this must not cause any side
- // effects now.
- .reduce(_ += _)
+ // Since we preallocated new accumulator vector per partition, this must not cause any side
+ // effects now.
+ .reduce(_ += _)
+ }
+
+ def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ val n = drm.ncol
+
+ drm.rdd
+ // Throw away keys
+ .map(_._2)
+ // Fold() doesn't work with kryo still. So work around it.
+ .mapPartitions(iter => {
+ val acc = ((new DenseVector(n): Vector) /: iter){(acc, v) =>
+ v.nonZeroes().foreach { elem =>
+ if (elem.get() > 0) acc(elem.index) += 1
+ }
+ acc
+ }
+ Iterator(acc)
+ })
+ // Since we preallocated new accumulator vector per partition, this must not cause any side
+ // effects now.
+ .reduce(_ += _)
}
/** Engine-specific colMeans implementation based on a checkpoint. */
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c4ef0d3..e9fd7ac 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -95,7 +95,9 @@ package object sparkbindings {
do {
val cp = r.readLine()
if (cp == null)
- throw new IllegalArgumentException("Unable to read output from \"mahout classpath\"")
+ throw new IllegalArgumentException(
+ "Unable to read output from \"mahout -spark classpath\". Is SPARK_HOME defined?"
+ )
val j = cp.split(File.pathSeparatorChar)
if (j.size > 10) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
new file mode 100644
index 0000000..3c05a42
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.scalatest.FunSuite
+import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.scalabindings.MatrixOps
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+
+/* values
+A =
+1 1 0 0 0
+0 0 1 1 0
+0 0 0 0 1
+1 0 0 1 0
+
+B =
+1 1 1 1 0
+1 1 1 1 0
+0 0 1 0 1
+1 1 0 1 0
+ */
+
+class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+
+ test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
+ val a = dense((1, 1, 0, 0, 0), (0, 0, 1, 1, 0), (0, 0, 0, 0, 1), (1, 0, 0, 1, 0))
+ val b = dense((1, 1, 1, 1, 0), (1, 1, 1, 1, 0), (0, 0, 1, 0, 1), (1, 1, 0, 1, 0))
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ // correct cooccurrence with LLR
+ val matrixLLRCoocAtAControl = dense(
+ (0.0, 1.7260924347106847, 0, 0, 0),
+ (1.7260924347106847, 0, 0, 0, 0),
+ (0, 0, 0, 1.7260924347106847, 0),
+ (0, 0, 1.7260924347106847, 0, 0),
+ (0, 0, 0, 0, 0)
+ )
+
+ // correct cross-cooccurrence with LLR
+ val matrixLLRCoocBtAControl = dense(
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (0, 0, 0, 0, 4.498681156950466)
+ )
+
+ //self similarity
+ val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+ }
+
+ test("cooccurrence [A'A], [B'A] double data using LLR") {
+ val a = dense((100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), (0.0D, 0.0D, 10.0D, 1.0D, 0.0D), (0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), (1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
+ val b = dense((10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), (10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ // correct cooccurrence with LLR
+ val matrixLLRCoocAtAControl = dense(
+ (0.0, 1.7260924347106847, 0, 0, 0),
+ (1.7260924347106847, 0, 0, 0, 0),
+ (0, 0, 0, 1.7260924347106847, 0),
+ (0, 0, 1.7260924347106847, 0, 0),
+ (0, 0, 0, 0, 0)
+ )
+
+ // correct cross-cooccurrence with LLR
+ val matrixLLRCoocBtAControl = dense(
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (0, 0, 0, 0, 4.498681156950466)
+ )
+
+ //self similarity
+ val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+ }
+
+ test("cooccurrence [A'A], [B'A] integer data using LLR") {
+ val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
+ val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0))
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ // correct cooccurrence with LLR
+ val matrixLLRCoocAtAControl = dense(
+ (0.0, 1.7260924347106847, 0, 0, 0),
+ (1.7260924347106847, 0, 0, 0, 0),
+ (0, 0, 0, 1.7260924347106847, 0),
+ (0, 0, 1.7260924347106847, 0, 0),
+ (0, 0, 0, 0, 0)
+ )
+
+ // correct cross-cooccurrence with LLR
+ val matrixLLRCoocBtAControl = dense(
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
+ (0, 0, 0, 0, 4.498681156950466)
+ )
+
+ //self similarity
+ val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ //var cp = drmSelfCooc(0).checkpoint()
+ //cp.writeDRM("/tmp/cooc-spark/")//to get values written
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+ }
+
+ test("LLR calc") {
+ val numInteractionsWithAandB = 10L
+ val numInteractionsWithA = 100L
+ val numInteractionsWithB = 200L
+ val numInteractions = 10000l
+
+ val llr = CooccurrenceAnalysis.loglikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+ assert(llr == 17.19462327013025)
+ }
+
+ test("downsampling by number per row") {
+ val a = dense((1, 1, 1, 1, 0),
+ (1, 1, 1, 1, 1),
+ (0, 0, 0, 0, 1),
+ (1, 1, 0, 1, 0)
+ )
+ val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
+
+ val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
+ //count non-zero values, should be == 7
+ var numValues = 0
+ val m = downSampledDrm.collect
+ val it = m.iterator()
+ while (it.hasNext) {
+ val v = it.next().vector()
+ val nonZeroIt = v.nonZeroes().iterator()
+ while (nonZeroIt.hasNext) {
+ numValues += 1
+ nonZeroIt.next()
+ }
+ }
+
+ assert(numValues == 8) //Don't change the random seed or this may fail.
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c1ca3087/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 6152426..30a602b 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -463,4 +463,15 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
drmA.colMeans() should equal (inCoreA.colMeans())
}
+ test("numNonZeroElementsPerColumn") {
+ val inCoreA = dense(
+ (0, 2),
+ (3, 4),
+ (0, 30)
+ )
+ val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+ drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn())
+ }
+
}