You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/08/28 20:00:15 UTC
[2/2] git commit: MAHOUT-1604 add a CLI and associated code for
spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569,
closes apache/mahout#47
MAHOUT-1604 add a CLI and associated code for spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569, closes apache/mahout#47
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/149c9859
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/149c9859
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/149c9859
Branch: refs/heads/master
Commit: 149c98592fe447c98dfb5afc67b5809725cc3056
Parents: 91f15ec
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Aug 28 10:45:13 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Aug 28 10:45:13 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
bin/mahout | 10 +-
.../mahout/math/cf/CooccurrenceAnalysis.scala | 220 ------
.../mahout/math/cf/SimilarityAnalysis.scala | 261 +++++++
.../apache/mahout/math/drm/RLikeDrmOps.scala | 11 +-
.../mahout/math/scalabindings/MatrixOps.scala | 3 +
.../math/scalabindings/MatrixOpsSuite.scala | 4 +-
spark/pom.xml | 21 +
.../apache/mahout/drivers/FileSysUtils.scala | 55 +-
.../apache/mahout/drivers/IndexedDataset.scala | 11 +-
.../mahout/drivers/ItemSimilarityDriver.scala | 117 ++-
.../apache/mahout/drivers/MahoutDriver.scala | 62 +-
.../mahout/drivers/MahoutOptionParser.scala | 42 +-
.../apache/mahout/drivers/ReaderWriter.scala | 21 +-
.../mahout/drivers/RowSimilarityDriver.scala | 159 +++++
.../org/apache/mahout/drivers/Schema.scala | 36 +-
.../drivers/TextDelimitedReaderWriter.scala | 134 +++-
.../mahout/sparkbindings/SparkEngine.scala | 2 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 29 +-
.../drivers/ItemSimilarityDriverSuite.scala | 713 ++++++++++---------
.../drivers/RowSimilarityDriverSuite.scala | 138 ++++
21 files changed, 1275 insertions(+), 776 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 47518b4..dfccd95 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1604: Spark version of rowsimilarity driver and associated additions to SimilarityAnalysis.scala (pferrel)
+
MAHOUT-1500: H2O Integration (Anand Avati via apalumbo)
MAHOUT-1606 - Add rowSums, rowMeans and diagonal extraction operations to distributed matrices (dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index 27acd9f..c22118b 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -88,6 +88,10 @@ if [ "$1" == "spark-itemsimilarity" ]; then
SPARK=1
fi
+if [ "$1" == "spark-rowsimilarity" ]; then
+ SPARK=1
+fi
+
if [ "$MAHOUT_CORE" != "" ]; then
IS_CORE=1
fi
@@ -179,7 +183,7 @@ then
done
fi
- # add spark-shell -- if we requested shell or other spark CLI driver
+ # add jars for running from the command line if we requested shell or spark CLI driver
if [ "$SPARK" == "1" ]; then
for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do
@@ -254,6 +258,10 @@ case "$1" in
shift
"$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.ItemSimilarityDriver" "$@"
;;
+ (spark-rowsimilarity)
+ shift
+ "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.RowSimilarityDriver" "$@"
+ ;;
(h2o-node)
shift
"$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "water.H2O" -md5skip "$@" -name mah2out
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
deleted file mode 100644
index 181b729..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.cf
-
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-
-/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object CooccurrenceAnalysis extends Serializable {
-
- /** Compares (Int,Double) pairs by the second value */
- private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
- def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
- maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
-
- implicit val distributedContext = drmARaw.context
-
- // Apply selective downsampling, pin resulting matrix
- val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
- // num users, which equals the maximum number of interactions per item
- val numUsers = drmA.nrow.toInt
-
- // Compute & broadcast the number of interactions per thing in A
- val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
- // Compute co-occurrence matrix A'A
- val drmAtA = drmA.t %*% drmA
-
- // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
- val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
-
- var indicatorMatrices = List(drmIndicatorsAtA)
-
- // Now look at cross-co-occurrences
- for (drmBRaw <- drmBs) {
- // Down-sample and pin other interaction matrix
- val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
- // Compute & broadcast the number of interactions per thing in B
- val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
- // Compute cross-co-occurrence matrix B'A
- // pferrel: yikes, this is the wrong order, a big change! so you know who to blame
- // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order
- val drmAtB = drmA.t %*% drmB
-
- val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing,
- bcastInteractionsPerItemA, bcastInteractionsPerThingB)
-
- indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
-
- drmB.uncache()
- }
-
- // Unpin downsampled interaction matrix
- drmA.uncache()
-
- // Return list of indicator matrices
- indicatorMatrices
- }
-
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- **/
- def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
- numInteractionsWithAandB: Long, numInteractions: Long) = {
-
- val k11 = numInteractionsWithAandB
- val k12 = numInteractionsWithA - numInteractionsWithAandB
- val k21 = numInteractionsWithB - numInteractionsWithAandB
- val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
- LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
- }
-
- def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
- bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
- crossCooccurrence: Boolean = true) = {
- drmBtA.mapBlock() {
- case (keys, block) =>
-
- val llrBlock = block.like()
- val numInteractionsB: Vector = bcastNumInteractionsB
- val numInteractionsA: Vector = bcastNumInteractionsA
-
- for (index <- 0 until keys.size) {
-
- val thingB = keys(index)
-
- // PriorityQueue to select the top-k items
- val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
- block(index, ::).nonZeroes().foreach { elem =>
- val thingA = elem.index
- val cooccurrences = elem.get
-
- // exclude co-occurrences of the item with itself
- if (crossCooccurrence || thingB != thingA) {
- // Compute loglikelihood ratio
- val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
- cooccurrences.toLong, numUsers)
-
- val candidate = thingA -> llr
-
- // matches legacy hadoop code and maps values to range (0..1)
- // val tLLR = 1.0 - (1.0 / (1.0 + llr))
- //val candidate = thingA -> tLLR
-
- // Enqueue item with score, if belonging to the top-k
- if (topItemsPerThing.size < maxInterestingItemsPerThing) {
- topItemsPerThing.enqueue(candidate)
- } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
- topItemsPerThing.dequeue()
- topItemsPerThing.enqueue(candidate)
- }
- }
- }
-
- // Add top-k interesting items to the output matrix
- topItemsPerThing.dequeueAll.foreach {
- case (otherThing, llrScore) =>
- llrBlock(index, otherThing) = llrScore
- }
- }
-
- keys -> llrBlock
- }
- }
-
- /**
- * Selectively downsample users and things with an anomalous amount of interactions, inspired by
- * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
- *
- * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
- */
- def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
- implicit val distributedContext = drmM.context
-
- // Pin raw interaction matrix
- val drmI = drmM.checkpoint()
-
- // Broadcast vector containing the number of interactions with each thing
- val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
- val downSampledDrmI = drmI.mapBlock() {
- case (keys, block) =>
- val numInteractions: Vector = bcastNumInteractions
-
- // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
- val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
-
- val downsampledBlock = block.like()
-
- // Downsample the interaction vector of each user
- for (userIndex <- 0 until keys.size) {
-
- val interactionsOfUser = block(userIndex, ::)
-
- val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
-
- val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
-
- interactionsOfUser.nonZeroes().foreach { elem =>
- val numInteractionsWithThing = numInteractions(elem.index)
- val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
- if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
- // We ignore the original interaction value and create a binary 0-1 matrix
- // as we only consider whether interactions happened or did not happen
- downsampledBlock(userIndex, elem.index) = 1
- }
- }
- }
-
- keys -> downsampledBlock
- }
-
- // Unpin raw interaction matrix
- drmI.uncache()
-
- downSampledDrmI
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
new file mode 100644
index 0000000..90d7559
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object SimilarityAnalysis extends Serializable {
+
+ /** Compares (Int,Double) pairs by the second value */
+ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+ /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+ * and returns a list of indicator and cross-indicator matrices
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ * @return
+ * */
+ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+ implicit val distributedContext = drmARaw.context
+
+ // Apply selective downsampling, pin resulting matrix
+ val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+ // num users, which equals the maximum number of interactions per item
+ val numUsers = drmA.nrow.toInt
+
+ // Compute & broadcast the number of interactions per thing in A
+ val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+ // Compute co-occurrence matrix A'A
+ val drmAtA = drmA.t %*% drmA
+
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+ val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+ bcastInteractionsPerItemA, crossCooccurrence = false)
+
+ var indicatorMatrices = List(drmIndicatorsAtA)
+
+ // Now look at cross-co-occurrences
+ for (drmBRaw <- drmBs) {
+ // Down-sample and pin other interaction matrix
+ val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+ // Compute & broadcast the number of interactions per thing in B
+ val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+ // Compute cross-co-occurrence matrix A'B
+ val drmAtB = drmA.t %*% drmB
+
+ val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+ bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+ indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+
+ drmB.uncache()
+ }
+
+ // Unpin downsampled interaction matrix
+ drmA.uncache()
+
+ // Return list of indicator matrices
+ indicatorMatrices
+ }
+
+ /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ * @return
+ * */
+ def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
+ maxNumInteractions: Int = 500): DrmLike[Int] = {
+
+ implicit val distributedContext = drmARaw.context
+
+ // Apply selective downsampling, pin resulting matrix
+ val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+ // num columns, which equals the maximum number of interactions per item
+ val numCols = drmA.ncol
+
+ // Compute & broadcast the number of interactions per row in A
+ val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow)
+
+ // Compute row similarity cooccurrence matrix AA'
+ val drmAAt = drmA %*% drmA.t
+
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+ val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
+ bcastInteractionsPerItemA, crossCooccurrence = false)
+
+ drmSimilaritiesAAt
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+ **/
+ def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+ numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+ val k11 = numInteractionsWithAandB
+ val k12 = numInteractionsWithA - numInteractionsWithAandB
+ val k21 = numInteractionsWithB - numInteractionsWithAandB
+ val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+ LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+ }
+
+ def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+ bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+ crossCooccurrence: Boolean = true) = {
+ drm.mapBlock() {
+ case (keys, block) =>
+
+ val llrBlock = block.like()
+ val numInteractionsB: Vector = bcastNumInteractionsB
+ val numInteractionsA: Vector = bcastNumInteractionsA
+
+ for (index <- 0 until keys.size) {
+
+ val thingB = keys(index)
+
+ // PriorityQueue to select the top-k items
+ val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+ block(index, ::).nonZeroes().foreach { elem =>
+ val thingA = elem.index
+ val cooccurrences = elem.get
+
+ // exclude co-occurrences of the item with itself
+ if (crossCooccurrence || thingB != thingA) {
+ // Compute loglikelihood ratio
+ val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+ cooccurrences.toLong, numUsers)
+
+ val candidate = thingA -> llr
+
+ // matches legacy hadoop code and maps values to range (0..1)
+ // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
+ // val candidate = thingA -> normailizedLLR
+
+ // Enqueue item with score, if belonging to the top-k
+ if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+ topItemsPerThing.enqueue(candidate)
+ } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+ topItemsPerThing.dequeue()
+ topItemsPerThing.enqueue(candidate)
+ }
+ }
+ }
+
+ // Add top-k interesting items to the output matrix
+ topItemsPerThing.dequeueAll.foreach {
+ case (otherThing, llrScore) =>
+ llrBlock(index, otherThing) = llrScore
+ }
+ }
+
+ keys -> llrBlock
+ }
+ }
+
+ /**
+ * Selectively downsample rows and items with an anomalous amount of interactions, inspired by
+ * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+ *
+ * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+ * @param drmM matrix to downsample
+ * @param seed random number generator seed, keep to a constant if repeatability is neccessary
+ * @param maxNumInteractions number of elements in a row of the returned matrix
+ * @return
+ */
+ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+ implicit val distributedContext = drmM.context
+
+ // Pin raw interaction matrix
+ val drmI = drmM.checkpoint()
+
+ // Broadcast vector containing the number of interactions with each thing
+ val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+ val downSampledDrmI = drmI.mapBlock() {
+ case (keys, block) =>
+ val numInteractions: Vector = bcastNumInteractions
+
+ // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+ val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+ val downsampledBlock = block.like()
+
+ // Downsample the interaction vector of each row
+ for (rowIndex <- 0 until keys.size) {
+
+ val interactionsInRow = block(rowIndex, ::)
+
+ val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements()
+
+ val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow
+
+ interactionsInRow.nonZeroes().foreach { elem =>
+ val numInteractionsWithThing = numInteractions(elem.index)
+ val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+ if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) {
+ // We ignore the original interaction value and create a binary 0-1 matrix
+ // as we only consider whether interactions happened or did not happen
+ downsampledBlock(rowIndex, elem.index) = 1
+ }
+ }
+ }
+
+ keys -> downsampledBlock
+ }
+
+ // Unpin raw interaction matrix
+ drmI.uncache()
+
+ downSampledDrmI
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 026ab75..d8d04e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -90,7 +90,16 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
// Collect block-wise rowsums and output them as one-column matrix.
keys -> dense(block.rowSums).t
}
- .collect(::, 0)
+ .collect(::, 0)
+ }
+
+ /** Counts the non-zeros elements in each row returning a vector of the counts */
+ def numNonZeroElementsPerRow(): Vector = {
+ drm.mapBlock(ncol = 1) { case (keys, block) =>
+ // Collect block-wise row non-zero counts and output them as a one-column matrix.
+ keys -> dense(block.numNonZeroElementsPerRow).t
+ }
+ .collect(::, 0)
}
/** Row means */
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index d5ac026..910035f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -191,7 +191,10 @@ class MatrixOps(val m: Matrix) {
/* Diagonal assignment */
def diagv_=(that: Double) = diagv := that
+ /* Row and Column non-zero element counts */
def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountNonZeroElementsFunc)
+
+ def numNonZeroElementsPerRow() = m.aggregateRows(vectorCountNonZeroElementsFunc)
}
object MatrixOps {
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index 5be6ca8..d7b22d9 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -120,10 +120,11 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
a.colMeans() should equal(dvec(2.5, 3.5, 4.5))
a.rowMeans() should equal(dvec(3, 4))
a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2))
+ a.numNonZeroElementsPerRow() should equal(dvec(3,3))
}
- test("numNonZeroElementsPerColumn") {
+ test("numNonZeroElementsPerColumn and Row") {
val a = dense(
(2, 3, 4),
(3, 4, 5),
@@ -132,6 +133,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
)
a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4))
+ a.numNonZeroElementsPerRow() should equal(dvec(3,3,2,1))
}
test("Vector Assignment performance") {
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 71d3944..2f79377 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -157,6 +157,27 @@
</executions>
</plugin>
+ <!-- create job jar to include CLI driver deps-->
+ <!-- leave this in even though there are no hadoop mapreduce jobs in this module -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>job</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/job.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
index 654f116..f48e9ed 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
@@ -21,25 +21,24 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
/**
- * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
- * @param pathURI Where to start looking for inFiles, only HDFS is currently
- * supported. The pathURI may be a list of comma delimited URIs like those supported
- * by Spark
- * @param filePattern regex that must match the entire filename to have the file returned
- * @param recursive true traverses the filesystem recursively
- */
+ * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
-case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: Boolean = false) {
+case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
val conf = new Configuration()
val fs = FileSystem.get(conf)
- /** returns a string of comma delimited URIs matching the filePattern */
+/** Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed. */
def uris :String = {
- if(recursive){
- val pathURIs = pathURI.split(",")
+ if (!filePattern.isEmpty){ // have file pattern so
+ val pathURIs = pathURI.split(",")
var files = ""
for ( uri <- pathURIs ){
files = findFiles(uri, filePattern, files)
@@ -51,21 +50,27 @@ case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive:
}
}
- /** find matching files in the dir, recursively call self when another directory is found */
- def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
- val fileStatuses: Array[FileStatus] = fs.listStatus (new Path(dir))
+/** Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match */
+ private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
+ val seed = fs.getFileStatus(new Path(dir))
var f :String = files
- for (fileStatus <- fileStatuses ){
- if (fileStatus.getPath().getName().matches(filePattern)
- && !fileStatus.isDir){// found a file
- if (fileStatus.getLen() != 0) {
- // file is not empty
- f = f + fileStatus.getPath.toUri.toString + ","
+
+ if (seed.isDir) {
+ val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+ for (fileStatus <- fileStatuses) {
+ if (fileStatus.getPath().getName().matches(filePattern)
+ && !fileStatus.isDir) {
+ // found a file
+ if (fileStatus.getLen() != 0) {
+ // file is not empty
+ f = f + fileStatus.getPath.toUri.toString + ","
+ }
+ } else if (fileStatus.isDir && recursive) {
+ f = findFiles(fileStatus.getPath.toString, filePattern, f)
}
- }else if (fileStatus.isDir){
- f = findFiles(fileStatus.getPath.toString, filePattern, f)
}
- }
+ }else{ f = dir }// was a filename not dir
f
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
index 41622a8..99f98f5 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -18,7 +18,7 @@
package org.apache.mahout.drivers
import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
import org.apache.mahout.sparkbindings._
@@ -61,13 +61,16 @@ case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String
val newMatrix = drmWrap[Int](drmRdd, n, ncol)
new IndexedDataset(newMatrix, rowIDs, columnIDs)
}
+
}
/**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
- * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
+ * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
+ * constructor for
+ * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
+ * [[org.apache.mahout.drivers.Reader]]
* {{{
- * val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source))
+ * val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
* }}}
*/
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 460106f..b05b55d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,7 +17,7 @@
package org.apache.mahout.drivers
-import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.SimilarityAnalysis
import scala.collection.immutable.HashMap
/**
@@ -25,7 +25,7 @@ import scala.collection.immutable.HashMap
* Reads text lines
* that contain (row id, column id, ...). The IDs are user specified strings which will be
* preserved in the
- * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
* will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
* matrices and calculate both the self similarity of the primary matrix and the row-wise
* similarity of the primary
@@ -34,7 +34,7 @@ import scala.collection.immutable.HashMap
* The options allow flexible control of the input schema, file discovery, output schema, and control of
* algorithm parameters.
* To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
+ * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
* you can specify only the input and output file and directory--all else will default to the correct values.
* Each output line will contain the Item ID and similar items sorted by LLR strength descending.
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
@@ -47,10 +47,6 @@ object ItemSimilarityDriver extends MahoutDriver {
"maxSimilaritiesPerItem" -> 100,
"appName" -> "ItemSimilarityDriver")
- // build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so the can override and previous options!
- private var options: Map[String, Any] = null
-
private var reader1: TextDelimitedIndexedDatasetReader = _
private var reader2: TextDelimitedIndexedDatasetReader = _
private var writer: TextDelimitedIndexedDatasetWriter = _
@@ -60,17 +56,15 @@ object ItemSimilarityDriver extends MahoutDriver {
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
- options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++
- MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++
- MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions
- val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
- head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+ parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+ head("spark-itemsimilarity", "Mahout 1.0")
//Input output options, non-driver specific
- parseIOOptions
+ parseIOOptions(numInputs = 2)
//Algorithm control options--driver specific
+ opts = opts ++ ItemSimilarityOptions
note("\nAlgorithm control options:")
opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
options + ("maxPrefs" -> x)
@@ -79,14 +73,11 @@ object ItemSimilarityDriver extends MahoutDriver {
if (x > 0) success else failure("Option --maxPrefs must be > 0")
}
-/** not implemented in CooccurrenceAnalysis.cooccurrence
- opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
- options.put("minPrefs", x)
- options
- } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
- if (x > 0) success else failure("Option --minPrefs must be > 0")
- }
-*/
+ /** not implemented in SimilarityAnalysis.cooccurrence
+ * threshold, and minPrefs
+ * todo: replacing the threshold with some % of the best values and/or a
+ * confidence measure expressed in standard deviations would be nice.
+ */
opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
options + ("maxSimilaritiesPerItem" -> x)
@@ -99,7 +90,7 @@ object ItemSimilarityDriver extends MahoutDriver {
note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
//Input text format
- parseInputSchemaOptions
+ parseElementInputSchemaOptions
//How to search for input
parseFileDiscoveryOptions
@@ -116,14 +107,14 @@ object ItemSimilarityDriver extends MahoutDriver {
help("help") abbr ("h") text ("prints this usage text\n")
}
- parser.parse(args, options) map { opts =>
- options = opts
+ parser.parse(args, parser.opts) map { opts =>
+ parser.opts = opts
process
}
}
- override def start(masterUrl: String = options("master").asInstanceOf[String],
- appName: String = options("appName").asInstanceOf[String]):
+ override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
+ appName: String = parser.opts("appName").asInstanceOf[String]):
Unit = {
// todo: the HashBiMap used in the TextDelimited Reader is hard coded into
@@ -131,31 +122,31 @@ object ItemSimilarityDriver extends MahoutDriver {
// will be only spcific to this job.
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
- .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
+ .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
super.start(masterUrl, appName)
- val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
- "filter" -> options("filter1").asInstanceOf[String],
- "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int],
- "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int],
- "filterPosition" -> options("filterPosition").asInstanceOf[Int])
+ val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+ "filter" -> parser.opts("filter1").asInstanceOf[String],
+ "rowIDPosition" -> parser.opts("rowIDPosition").asInstanceOf[Int],
+ "columnIDPosition" -> parser.opts("itemIDPosition").asInstanceOf[Int],
+ "filterPosition" -> parser.opts("filterPosition").asInstanceOf[Int])
reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
- if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null)
- || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){
+ if ((parser.opts("filterPosition").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
+ || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
// only need to change the filter used compared to readSchema1
- val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String])
+ val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
}
writeSchema = new Schema(
- "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String],
- "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String],
- "omitScore" -> options("omitStrength").asInstanceOf[Boolean],
- "tupleDelim" -> options("tupleDelim").asInstanceOf[String])
+ "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
+ "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
+ "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
+ "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
@@ -163,19 +154,19 @@ object ItemSimilarityDriver extends MahoutDriver {
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
- options("recursive").asInstanceOf[Boolean]).uris
- val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) ""
- else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
- options("recursive").asInstanceOf[Boolean]).uris
+ val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ parser.opts("recursive").asInstanceOf[Boolean]).uris
+ val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
+ else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
Array()
} else {
- val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles))
- if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA,
- options("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+ val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
+ parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
// The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
// Here we assume there is one row ID space for all interactions. To do this we calculate the
@@ -185,15 +176,15 @@ object ItemSimilarityDriver extends MahoutDriver {
// be supported (and are at least on Spark) or the row cardinality fix will not work.
val datasetB = if (!inFiles2.isEmpty) {
// get cross-cooccurrence interactions from separate files
- val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+ val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
datasetB
- } else if (options("filterPosition").asInstanceOf[Int] != -1
- && options("filter2").asInstanceOf[String] != null) {
+ } else if (parser.opts("filterPosition").asInstanceOf[Int] != -1
+ && parser.opts("filter2").asInstanceOf[String] != null) {
// get cross-cooccurrences interactions by using two filters on a single set of files
- val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+ val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
datasetB
@@ -201,18 +192,18 @@ object ItemSimilarityDriver extends MahoutDriver {
null.asInstanceOf[IndexedDataset]
}
if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
- // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
- val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
+ // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
+ val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
// todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on
// its calculation
val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality)
- else datasetA // this guarantees matching cardinality
+ else datasetA // this guarantees matching cardinality
val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
- else datasetB // this guarantees matching cardinality
+ else datasetB // this guarantees matching cardinality
- if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions")
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
Array(returnedA, returnedB)
} else Array(datasetA)
@@ -227,25 +218,25 @@ object ItemSimilarityDriver extends MahoutDriver {
// todo: allow more than one cross-similarity matrix?
val indicatorMatrices = {
if (indexedDatasets.length > 1) {
- CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
- options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int],
- Array(indexedDatasets(1).matrix))
+ SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
+ Array(indexedDatasets(1).matrix))
} else {
- CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
- options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int])
+ SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
}
}
// an alternative is to create a version of IndexedDataset that knows how to write itself
val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
indexedDatasets(0).columnIDs, writeSchema)
- selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix")
+ selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
// todo: would be nice to support more than one cross-similarity indicator
if (indexedDatasets.length > 1) {
val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
- writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix")
+ writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index e92ed37..6ea7c8b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -26,43 +26,55 @@ import scala.collection.immutable
/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
* Also define a Map of options for the command line parser. The following template may help:
* {{{
- * object SomeDriver extends MahoutDriver {
- * // build options from some stardard CLI param groups
- * // Note: always put the driver specific options at the last so the can override and previous options!
- * private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++
- * TextDelimitedDRMOptions ++ ItemSimilarityOptions
+ * object SomeDriver extends MahoutDriver {
*
- * override def main(args: Array[String]): Unit = {
- * val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
- * head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+ * // define only the options specific to this driver, inherit the generic ones
+ * private final val SomeOptions = HashMap[String, Any](
+ * "maxThings" -> 500,
+ * "minThings" -> 100,
+ * "appName" -> "SomeDriver")
*
- * //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods
- * parseGenericOptions
- * ...
- * }
- * parser.parse(args, options) map { opts =>
- * options = opts
- * process
- * }
- * }
+ * override def main(args: Array[String]): Unit = {
+ *
+ *
+ * val parser = new MahoutOptionParser(programName = "shortname") {
+ * head("somedriver", "Mahout 1.0-SNAPSHOT")
*
- * override def process: Unit = {
- * start()
- * //don't just stand there do something
- * stop
+ * // Input output options, non-driver specific
+ * parseIOOptions
+ *
+ * // Algorithm specific options
+ * // Add in the new options
+ * opts = opts ++ SomeOptions
+ * note("\nAlgorithm control options:")
+ * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ * options + ("maxThings" -> x) ...
+ * }
+ * parser.parse(args, parser.opts) map { opts =>
+ * parser.opts = opts
+ * process
* }
* }
+ *
+ * override def process: Unit = {
+ * start()
+ * // do the work here
+ * stop
+ * }
+ *
* }}}
*/
abstract class MahoutDriver {
- implicit var mc: DistributedContext = _
- implicit var sparkConf = new SparkConf()
- var _useExistingContext: Boolean = false
+ implicit protected var mc: DistributedContext = _
+ implicit protected var sparkConf = new SparkConf()
+ protected var parser: MahoutOptionParser = _
+
+ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
/** Creates a Spark context to run the job inside.
- * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
+ * Override to set the SparkConf values specific to the job,
* these must be set before the context is created.
* @param masterUrl Spark master URL
* @param appName Name to display in Spark UI
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 3aada78..6908bd2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -32,13 +32,15 @@ object MahoutOptionParser {
"appName" -> "Generic Spark App, Change this.")
final val FileIOOptions = immutable.HashMap[String, Any](
- "recursive" -> false,
"input" -> null.asInstanceOf[String],
"input2" -> null.asInstanceOf[String],
- "output" -> null.asInstanceOf[String],
+ "output" -> null.asInstanceOf[String])
+
+ final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+ "recursive" -> false,
"filenamePattern" -> "^part-.*")
- final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any](
+ final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
"rowIDPosition" -> 0,
"itemIDPosition" -> 1,
"filterPosition" -> -1,
@@ -49,7 +51,7 @@ object MahoutOptionParser {
final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> " ",
+ "elementDelim" -> " ",
"omitStrength" -> false)
}
/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
@@ -57,17 +59,25 @@ object MahoutOptionParser {
* @param programName Name displayed in help message, the name by which the driver is invoked.
* */
class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+ // build options from some stardard CLI param groups
+ // Note: always put the driver specific options at the last so the can override and previous options!
+ var opts = Map.empty[String, Any]
+
override def showUsageOnError = true
- def parseIOOptions = {
+ def parseIOOptions(numInputs: Int = 1) = {
+ opts = opts ++ MahoutOptionParser.FileIOOptions
note("Input, output options")
opt[String]('i', "input") required() action { (x, options) =>
options + ("input" -> x)
} text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
- opt[String]("input2") abbr ("i2") action { (x, options) =>
- options + ("input2" -> x)
- } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ if (numInputs == 2) {
+ opt[String]("input2") abbr ("i2") action { (x, options) =>
+ options + ("input2" -> x)
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ }
opt[String]('o', "output") required() action { (x, options) =>
// todo: check to see if HDFS allows MS-Windows backslashes locally?
@@ -81,6 +91,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
def parseSparkOptions = {
+ opts = opts ++ MahoutOptionParser.SparkOptions
note("\nSpark config options:")
opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
@@ -94,7 +105,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
def parseGenericOptions = {
- note("\nGeneral config options:")
+ opts = opts ++ MahoutOptionParser.GenericOptions
opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
options + ("randomSeed" -> x)
} validate { x =>
@@ -107,9 +118,10 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}//Hidden option, though a user might want this.
}
- def parseInputSchemaOptions{
- //Input text file schema--not driver specific but input data specific, tuples input,
+ def parseElementInputSchemaOptions{
+ //Input text file schema--not driver specific but input data specific, elements input,
// not drms
+ opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
note("\nInput text file schema options:")
opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
options + ("inDelim" -> x)
@@ -162,6 +174,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
def parseFileDiscoveryOptions = {
//File finding strategy--not driver specific
+ opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
note("\nFile discovery options:")
opt[Unit]('r', "recursive") action { (_, options) =>
options + ("recursive" -> true)
@@ -174,6 +187,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
def parseDrmFormatOptions = {
+ opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
note("\nOutput text file schema options:")
opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
options + ("rowKeyDelim" -> x)
@@ -183,9 +197,9 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
options + ("columnIdStrengthDelim" -> x)
} text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
- opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
- options + ("tupleDelim" -> x)
- } text ("Separates vector tuple values in the values list (optional). Default: \" \"")
+ opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+ options + ("elementDelim" -> x)
+ } text ("Separates vector element values in the values list (optional). Default: \" \"")
opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
options + ("omitStrength" -> true)
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
index e2bb49c..6351e45 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -20,7 +20,7 @@ package org.apache.mahout.drivers
import com.google.common.collect.{HashBiMap, BiMap}
import org.apache.mahout.math.drm.DistributedContext
-/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read.
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
* @tparam T type of object read.
*/
trait Reader[T]{
@@ -28,16 +28,27 @@ trait Reader[T]{
val mc: DistributedContext
val readSchema: Schema
- protected def tupleReader(
+ protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
existingRowIDs: BiMap[String, Int]): T
- def readTuplesFrom(
+ protected def drmReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ def readElementsFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ elementReader(mc, readSchema, source, existingRowIDs)
+
+ def readDRMFrom(
source: String,
existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- tupleReader(mc, readSchema, source, existingRowIDs)
+ drmReader(mc, readSchema, source, existingRowIDs)
}
/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
@@ -51,5 +62,5 @@ trait Writer[T]{
protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
- def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
new file mode 100644
index 0000000..920c32b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -0,0 +1,159 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.cf.SimilarityAnalysis
+import scala.collection.immutable.HashMap
+
+/**
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]].
+ * Reads a text delimited file containing a Mahout DRM of the form
+ * (row id, column id: strength, ...). The IDs are user specified strings which will be
+ * preserved in the
+ * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]
+ * will be used to calculate row-wise similarity using log-likelihood
+ * The options allow control of the input schema, file discovery, output schema, and control of
+ * algorithm parameters.
+ * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
+ * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
+ * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
+ * Each output line will contain a row ID and similar columns sorted by LLR strength descending.
+ * @note To use with a Spark cluster see the --master option, if you run out of heap space check
+ * the --sparkExecutorMemory option.
+ */
+object RowSimilarityDriver extends MahoutDriver {
+ // define only the options specific to RowSimilarity
+ private final val RowSimilarityOptions = HashMap[String, Any](
+ "maxObservations" -> 500,
+ "maxSimilaritiesPerRow" -> 100,
+ "appName" -> "RowSimilarityDriver")
+
+ private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _
+ private var readWriteSchema: Schema = _
+
+ /**
+ * @param args Command line args, if empty a help message is printed.
+ */
+ override def main(args: Array[String]): Unit = {
+
+ parser = new MahoutOptionParser(programName = "spark-rowsimilarity") {
+ head("spark-rowsimilarity", "Mahout 1.0")
+
+ //Input output options, non-driver specific
+ parseIOOptions()
+
+ //Algorithm control options--driver specific
+ opts = opts ++ RowSimilarityOptions
+
+ note("\nAlgorithm control options:")
+ opt[Int]("maxObservations") abbr ("mo") action { (x, options) =>
+ options + ("maxObservations" -> x)
+ } text ("Max number of observations to consider per row (optional). Default: " +
+ RowSimilarityOptions("maxObservations")) validate { x =>
+ if (x > 0) success else failure("Option --maxObservations must be > 0")
+ }
+
+ opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
+ options + ("maxSimilaritiesPerRow" -> x)
+ } text ("Limit the number of similarities per item to this number (optional). Default: " +
+ RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
+ if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+ }
+
+ /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
+ * todo: replacing the threshold with some % of the best values and/or a
+ * confidence measure expressed in standard deviations would be nice.
+ */
+
+ //Driver notes--driver specific
+ note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
+
+ //Drm output schema--not driver specific, drm specific
+ parseDrmFormatOptions
+
+ //How to search for input
+ parseFileDiscoveryOptions
+
+ //Spark config options--not driver specific
+ parseSparkOptions
+
+ //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
+ parseGenericOptions
+
+ help("help") abbr ("h") text ("prints this usage text\n")
+
+ }
+ parser.parse(args, parser.opts) map { opts =>
+ parser.opts = opts
+ process
+ }
+ }
+
+ override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
+ appName: String = parser.opts("appName").asInstanceOf[String]):
+ Unit = {
+
+ // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
+ // MahoutKryoRegistrator, it should be added to the register list here so it
+ // will be only spcific to this job.
+ sparkConf.set("spark.kryo.referenceTracking", "false")
+ .set("spark.kryoserializer.buffer.mb", "200")
+ .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+
+ super.start(masterUrl, appName)
+
+ readWriteSchema = new Schema(
+ "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
+ "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
+ "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
+ "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
+
+ readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema)
+
+ }
+
+ private def readIndexedDataset: IndexedDataset = {
+
+ val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ parser.opts("recursive").asInstanceOf[Boolean]).uris
+
+ if (inFiles.isEmpty) {
+ null.asInstanceOf[IndexedDataset]
+ } else {
+
+ val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles))
+ datasetA
+ }
+ }
+
+ override def process: Unit = {
+ start()
+
+ val indexedDataset = readIndexedDataset
+
+ val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int])
+
+ val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm,
+ indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema)
+ rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String])
+
+ stop
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index edff92d..42b2658 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -41,26 +41,26 @@ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
// format is not required.
-/** Simple default Schema for typical text delimited tuple file input
- * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID
+/** Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
* <comma, tab, or space>here may be other ignored text...)
*/
-class DefaultTupleReadSchema extends Schema(
- "delim" -> "[,\t ]", //comma, tab or space
- "filter" -> "",
- "rowIDPosition" -> 0,
- "columnIDPosition" -> 1,
- "filterPosition" -> -1)
+class DefaultElementReadSchema extends Schema(
+ "delim" -> "[,\t ]", //comma, tab or space
+ "filter" -> "",
+ "rowIDPosition" -> 0,
+ "columnIDPosition" -> 1,
+ "filterPosition" -> -1)
/** Default Schema for text delimited drm file output
* This tells the writer to write a DRM of the default form:
* (rowID<tab>columnID1:score1<space>columnID2:score2...)
*/
class DefaultDRMWriteSchema extends Schema(
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "tupleDelim" -> " ",
- "omitScore" -> false)
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> false)
/** Default Schema for typical text delimited drm file input
* This tells the reader to input text lines of the form:
@@ -69,9 +69,9 @@ class DefaultDRMWriteSchema extends Schema(
class DefaultDRMReadSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> " ")
+ "elementDelim" -> " ")
-/** Default Schema for reading a text delimited drm file where the score of any tuple is ignored,
+/** Default Schema for reading a text delimited drm file where the score of any element is ignored,
* all non-zeros are replaced with 1.
* This tells the reader to input DRM lines of the form
* (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
@@ -82,17 +82,17 @@ class DefaultDRMReadSchema extends Schema(
class DRMReadBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> " ",
+ "elementDelim" -> " ",
"omitScore" -> true)
-/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted.
- * The presence of a tuple means the score = 1, the absence means a score of 0.
+/** Default Schema for typical text delimited drm file write where the score of a element is omitted.
+ * The presence of a element means the score = 1, the absence means a score of 0.
* This tells the writer to output DRM lines of the form
* (rowID<tab>columnID1<space>columnID2...)
*/
class DRMWriteBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> " ",
+ "elementDelim" -> " ",
"omitScore" -> true)
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 11d647b..53a36a5 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -27,14 +27,16 @@ import scala.collection.JavaConversions._
/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
*/
trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
- /** Read in text delimited tuples from all URIs in this comma delimited source String.
+ /** Read in text delimited elements from all URIs in the comma delimited source String and return
+ * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+ * no strength value in the element, assume it's presence means a strength of 1.
*
* @param mc context for the Spark job
* @param readSchema describes the delimiters and positions of values in the text delimited file.
* @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
* @return
*/
- protected def tupleReader(
+ protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
@@ -53,7 +55,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
})
var columns = mc.textFile(source).map { line => line.split(delimiter) }
- //val m = columns.collect
// -1 means no filter in the input text, take them all
if(filterPosition != -1) {
@@ -113,9 +114,100 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
}
}
- // this creates a BiMap from an ID collection. The ID points to an ordinal int
+ /** Read in text delimited rows from all URIs in this comma delimited source String and return
+ * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+ * no strength value in the element, assume it's presence means a strength of 1.
+ *
+ * @param mc context for the Spark job
+ * @param readSchema describes the delimiters and positions of values in the text delimited file.
+ * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+ * @return
+ */
+ protected def drmReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+ try {
+ val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
+ val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String]
+ val elementDelim = readSchema("elementDelim").asInstanceOf[String]
+ // no need for omitScore since we can tell if there is a score and assume it is 1.0d if not specified
+ //val omitScore = readSchema("omitScore").asInstanceOf[Boolean]
+
+ assert(!source.isEmpty, {
+ println(this.getClass.toString + ": has no files to read")
+ throw new IllegalArgumentException
+ })
+
+ var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) }
+
+ // get row and column IDs
+ val interactions = rows.map { row =>
+ row(0) -> row(1)// rowID token -> string of column IDs+strengths
+ }
+
+ interactions.cache()
+ interactions.collect()
+
+ // create separate collections of rowID and columnID tokens
+ val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
+
+ // the columns are in a TD string so separate them and get unique ones
+ val columnIDs = interactions.flatMap { case (_, columns) => columns
+ val elements = columns.split(elementDelim)
+ val colIDs = elements.map( elem => elem.split(columnIdStrengthDelim)(0) )
+ colIDs
+ }.distinct().collect()
+
+ val numRows = rowIDs.size
+ val numColumns = columnIDs.size
+
+ // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
+ // broadcast them for access in distributed processes, so they are not recalculated in every task.
+ val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
+ val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
+ val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
+ val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+
+ val indexedInteractions =
+ interactions.map { case (rowID, columns) =>
+ val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+
+ val elements = columns.split(elementDelim)
+ val row = new RandomAccessSparseVector(numColumns)
+ for (element <- elements) {
+ val id = element.split(columnIdStrengthDelim)(0)
+ val columnID = columnIDDictionary_bcast.value.get(id).get
+ val pair = element.split(columnIdStrengthDelim)
+ if (pair.size == 2)// there was a strength
+ row.setQuick(columnID,pair(1).toDouble)
+ else // no strength so set DRM value to 1.0d, this ignores 'omitScore', which is a write param
+ row.setQuick(columnID,1.0d)
+ }
+ rowIndex -> row
+ }
+ .asInstanceOf[DrmRdd[Int]]
+
+ // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
+ val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+
+ IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+
+ } catch {
+ case cce: ClassCastException => {
+ println(this.getClass.toString + ": Schema has illegal values")
+ throw cce
+ }
+ }
+ }
+
+ // this creates a BiMap from an ID collection. The ID points to an ordinal int
// which is used internal to Mahout as the row or column ID
- // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
+ // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
+ // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
+ // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
for (entry <- entries) {
@@ -130,7 +222,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
- /** Read in text delimited tuples from all URIs in this comma delimited source String.
+ /** Read in text delimited elements from all URIs in this comma delimited source String.
*
* @param mc context for the Spark job
* @param writeSchema describes the delimiters and positions of values in the output text delimited file.
@@ -145,7 +237,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
try {
val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
- val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String]
+ val elementDelim = writeSchema("elementDelim").asInstanceOf[String]
val omitScore = writeSchema("omitScore").asInstanceOf[Boolean]
//instance vars must be put into locally scoped vals when put into closures that are
//executed but Spark
@@ -177,11 +269,11 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
// first get the external rowID token
if (!vector.isEmpty){
var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
- // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+ // for the rest of the row, construct the vector contents of elements (external column ID, strength value)
for (item <- vector) {
line += columnIDDictionary.inverse.get(item.getFirst)
if (!omitScore) line += columnIdStrengthDelim + item.getSecond
- line += tupleDelim
+ line += elementDelim
}
// drop the last delimiter, not needed to end the line
line.dropRight(1)
@@ -203,7 +295,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param readSchema describes the delimiters and position of values in the text delimited file to be read.
* @param mc Spark context for reading files
- * @note The source is supplied by Reader#readTuplesFrom .
+ * @note The source is supplied by Reader#readElementsFrom .
* */
class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
@@ -211,7 +303,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
* @param mc Spark context for reading files
- * @note the destination is supplied by Writer#writeDRMTo trait method
+ * @note the destination is supplied by Writer#writeTo trait method
* */
class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
@@ -224,7 +316,7 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
(implicit val mc: DistributedContext)
extends TDIndexedDatasetReaderWriter
-/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating
+/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
* a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
* Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
* are probably short lived in terms of lines of code so complexity may be moot.
@@ -243,18 +335,20 @@ class IndexedDatasetTextDelimitedWriteable(
(implicit val mc: DistributedContext)
extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
- def writeTo(dest: String): Unit = {
- writeDRMTo(this, dest)
+ override def writeTo(collection: IndexedDataset = this, dest: String): Unit = {
+ super.writeTo(this, dest)
}
}
/**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
- * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
- * {{{
- * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source))
- * }}}
- */
+ * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily
+ * used to get a secondary constructor for
+ * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a
+ * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
+ * {{{
+ * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source))
+ * }}}
+ */
object IndexedDatasetTextDelimitedWriteable {
/** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index dedb279..54f33ef 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -62,7 +62,7 @@ object SparkEngine extends DistributedEngine {
// Fold() doesn't work with kryo still. So work around it.
.mapPartitions(iter => {
val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) =>
- v.nonZeroes().foreach { elem => acc(elem.index) += 1}
+ v.nonZeroes().foreach { elem => acc(elem.index) += 1 }
acc
}
Iterator(acc)