You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/08 20:54:44 UTC
[5/6] git commit: MAHOUT-1541, MAHOUT-1568,
MAHOUT-1569 refactoring the options parser and option defaults to DRY
up individual driver code putting more in base classes,
tightened up the test suite with a better way of comparing actual with correct
MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a8097403
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a8097403
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a8097403
Branch: refs/heads/spark-1.0.x
Commit: a80974037853c5227f9e5ef1c384a1fca134746e
Parents: 00c0149
Author: pferrel <pa...@occamsmachete.com>
Authored: Wed Aug 6 16:28:37 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Wed Aug 6 16:28:37 2014 -0700
----------------------------------------------------------------------
.../mahout/math/cf/CooccurrenceAnalysis.scala | 220 ++++++++++
.../apache/mahout/cf/CooccurrenceAnalysis.scala | 218 ----------
.../apache/mahout/drivers/IndexedDataset.scala | 25 +-
.../mahout/drivers/ItemSimilarityDriver.scala | 293 +++++--------
.../apache/mahout/drivers/MahoutDriver.scala | 28 +-
.../mahout/drivers/MahoutOptionParser.scala | 185 +++++++-
.../apache/mahout/drivers/ReaderWriter.scala | 30 +-
.../org/apache/mahout/drivers/Schema.scala | 54 ++-
.../drivers/TextDelimitedReaderWriter.scala | 107 +++--
.../drm/CheckpointedDrmSpark.scala | 1 -
.../io/MahoutKryoRegistrator.scala | 6 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 49 ++-
.../drivers/ItemSimilarityDriverSuite.scala | 422 +++++++++++++++++--
13 files changed, 1114 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..181b729
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+ /** Compares (Int,Double) pairs by the second value */
+ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+ implicit val distributedContext = drmARaw.context
+
+ // Apply selective downsampling, pin resulting matrix
+ val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+ // num users, which equals the maximum number of interactions per item
+ val numUsers = drmA.nrow.toInt
+
+ // Compute & broadcast the number of interactions per thing in A
+ val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+ // Compute co-occurrence matrix A'A
+ val drmAtA = drmA.t %*% drmA
+
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+ val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+ bcastInteractionsPerItemA, crossCooccurrence = false)
+
+ var indicatorMatrices = List(drmIndicatorsAtA)
+
+ // Now look at cross-co-occurrences
+ for (drmBRaw <- drmBs) {
+ // Down-sample and pin other interaction matrix
+ val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+ // Compute & broadcast the number of interactions per thing in B
+ val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+ // Compute cross-co-occurrence matrix B'A
+ // pferrel: yikes, this is the wrong order, a big change! so you know who to blame
+ // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order
+ val drmAtB = drmA.t %*% drmB
+
+ val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing,
+ bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+ indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+
+ drmB.uncache()
+ }
+
+ // Unpin downsampled interaction matrix
+ drmA.uncache()
+
+ // Return list of indicator matrices
+ indicatorMatrices
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+ **/
+ def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+ numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+ val k11 = numInteractionsWithAandB
+ val k12 = numInteractionsWithA - numInteractionsWithAandB
+ val k21 = numInteractionsWithB - numInteractionsWithAandB
+ val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+ LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+ }
+
+ def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+ bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+ crossCooccurrence: Boolean = true) = {
+ drmBtA.mapBlock() {
+ case (keys, block) =>
+
+ val llrBlock = block.like()
+ val numInteractionsB: Vector = bcastNumInteractionsB
+ val numInteractionsA: Vector = bcastNumInteractionsA
+
+ for (index <- 0 until keys.size) {
+
+ val thingB = keys(index)
+
+ // PriorityQueue to select the top-k items
+ val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+ block(index, ::).nonZeroes().foreach { elem =>
+ val thingA = elem.index
+ val cooccurrences = elem.get
+
+ // exclude co-occurrences of the item with itself
+ if (crossCooccurrence || thingB != thingA) {
+ // Compute loglikelihood ratio
+ val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+ cooccurrences.toLong, numUsers)
+
+ val candidate = thingA -> llr
+
+ // matches legacy hadoop code and maps values to range (0..1)
+ // val tLLR = 1.0 - (1.0 / (1.0 + llr))
+ //val candidate = thingA -> tLLR
+
+ // Enqueue item with score, if belonging to the top-k
+ if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+ topItemsPerThing.enqueue(candidate)
+ } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+ topItemsPerThing.dequeue()
+ topItemsPerThing.enqueue(candidate)
+ }
+ }
+ }
+
+ // Add top-k interesting items to the output matrix
+ topItemsPerThing.dequeueAll.foreach {
+ case (otherThing, llrScore) =>
+ llrBlock(index, otherThing) = llrScore
+ }
+ }
+
+ keys -> llrBlock
+ }
+ }
+
+ /**
+ * Selectively downsample users and things with an anomalous amount of interactions, inspired by
+ * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+ *
+ * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+ */
+ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+ implicit val distributedContext = drmM.context
+
+ // Pin raw interaction matrix
+ val drmI = drmM.checkpoint()
+
+ // Broadcast vector containing the number of interactions with each thing
+ val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+ val downSampledDrmI = drmI.mapBlock() {
+ case (keys, block) =>
+ val numInteractions: Vector = bcastNumInteractions
+
+ // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+ val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+ val downsampledBlock = block.like()
+
+ // Downsample the interaction vector of each user
+ for (userIndex <- 0 until keys.size) {
+
+ val interactionsOfUser = block(userIndex, ::)
+
+ val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
+
+ val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
+
+ interactionsOfUser.nonZeroes().foreach { elem =>
+ val numInteractionsWithThing = numInteractions(elem.index)
+ val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+ if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
+ // We ignore the original interaction value and create a binary 0-1 matrix
+ // as we only consider whether interactions happened or did not happen
+ downsampledBlock(userIndex, elem.index) = 1
+ }
+ }
+ }
+
+ keys -> downsampledBlock
+ }
+
+ // Unpin raw interaction matrix
+ drmI.uncache()
+
+ downSampledDrmI
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
deleted file mode 100644
index 14cc9d5..0000000
--- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-
-/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object CooccurrenceAnalysis extends Serializable {
-
- /** Compares (Int,Double) pairs by the second value */
- private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
- def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
- maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
-
- implicit val distributedContext = drmARaw.context
-
- // Apply selective downsampling, pin resulting matrix
- val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
- // num users, which equals the maximum number of interactions per item
- val numUsers = drmA.nrow.toInt
-
- // Compute & broadcast the number of interactions per thing in A
- val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
- // Compute co-occurrence matrix A'A
- val drmAtA = drmA.t %*% drmA
-
- // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
- val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
-
- var indicatorMatrices = List(drmIndicatorsAtA)
-
- // Now look at cross-co-occurrences
- for (drmBRaw <- drmBs) {
- // Down-sample and pin other interaction matrix
- val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
- // Compute & broadcast the number of interactions per thing in B
- val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
- // Compute cross-co-occurrence matrix B'A
- val drmBtA = drmB.t %*% drmA
-
- val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing,
- bcastInteractionsPerThingB, bcastInteractionsPerItemA)
-
- indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
-
- drmB.uncache()
- }
-
- // Unpin downsampled interaction matrix
- drmA.uncache()
-
- // Return list of indicator matrices
- indicatorMatrices
- }
-
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- **/
- def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
- numInteractionsWithAandB: Long, numInteractions: Long) = {
-
- val k11 = numInteractionsWithAandB
- val k12 = numInteractionsWithA - numInteractionsWithAandB
- val k21 = numInteractionsWithB - numInteractionsWithAandB
- val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
- LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
- }
-
- def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
- bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
- crossCooccurrence: Boolean = true) = {
- drmBtA.mapBlock() {
- case (keys, block) =>
-
- val llrBlock = block.like()
- val numInteractionsB: Vector = bcastNumInteractionsB
- val numInteractionsA: Vector = bcastNumInteractionsA
-
- for (index <- 0 until keys.size) {
-
- val thingB = keys(index)
-
- // PriorityQueue to select the top-k items
- val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
- block(index, ::).nonZeroes().foreach { elem =>
- val thingA = elem.index
- val cooccurrences = elem.get
-
- // exclude co-occurrences of the item with itself
- if (crossCooccurrence || thingB != thingA) {
- // Compute loglikelihood ratio
- val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
- cooccurrences.toLong, numUsers)
-
- val candidate = thingA -> llr
-
- // matches legacy hadoop code and maps values to range (0..1)
- // val tLLR = 1.0 - (1.0 / (1.0 + llr))
- //val candidate = thingA -> tLLR
-
- // Enqueue item with score, if belonging to the top-k
- if (topItemsPerThing.size < maxInterestingItemsPerThing) {
- topItemsPerThing.enqueue(candidate)
- } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
- topItemsPerThing.dequeue()
- topItemsPerThing.enqueue(candidate)
- }
- }
- }
-
- // Add top-k interesting items to the output matrix
- topItemsPerThing.dequeueAll.foreach {
- case (otherThing, llrScore) =>
- llrBlock(index, otherThing) = llrScore
- }
- }
-
- keys -> llrBlock
- }
- }
-
- /**
- * Selectively downsample users and things with an anomalous amount of interactions, inspired by
- * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
- *
- * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
- */
- def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
- implicit val distributedContext = drmM.context
-
- // Pin raw interaction matrix
- val drmI = drmM.checkpoint()
-
- // Broadcast vector containing the number of interactions with each thing
- val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
- val downSampledDrmI = drmI.mapBlock() {
- case (keys, block) =>
- val numInteractions: Vector = bcastNumInteractions
-
- // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
- val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
-
- val downsampledBlock = block.like()
-
- // Downsample the interaction vector of each user
- for (userIndex <- 0 until keys.size) {
-
- val interactionsOfUser = block(userIndex, ::)
-
- val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
-
- val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
-
- interactionsOfUser.nonZeroes().foreach { elem =>
- val numInteractionsWithThing = numInteractions(elem.index)
- val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
- if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
- // We ignore the original interaction value and create a binary 0-1 matrix
- // as we only consider whether interactions happened or did not happen
- downsampledBlock(userIndex, elem.index) = 1
- }
- }
- }
-
- keys -> downsampledBlock
- }
-
- // Unpin raw interaction matrix
- drmI.uncache()
-
- downSampledDrmI
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
index 0d8c160..41622a8 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -19,6 +19,8 @@ package org.apache.mahout.drivers
import com.google.common.collect.BiMap
import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
+import org.apache.mahout.sparkbindings._
/**
* Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
@@ -39,14 +41,33 @@ import org.apache.mahout.math.drm.CheckpointedDrm
* to be not created when not needed.
*/
-case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+
+ // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
+ // learn this afterwards
+
+ /**
+ * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+ * No physical changes are made to the underlying drm.
+ * @param n number to use for row carnindality, should be larger than current
+ * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
+ * results.
+ */
+ def newRowCardinality(n: Int): IndexedDataset = {
+ assert(n > -1)
+ assert( n >= matrix.nrow)
+ val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
+ val ncol = matrix.ncol
+ val newMatrix = drmWrap[Int](drmRdd, n, ncol)
+ new IndexedDataset(newMatrix, rowIDs, columnIDs)
+ }
}
/**
* Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
* making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
* {{{
- * val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source))
+ * val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source))
* }}}
*/
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 71d36c9..e0eaabc 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,7 +17,8 @@
package org.apache.mahout.drivers
-import org.apache.mahout.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import scala.collection.immutable.HashMap
/**
* Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
@@ -25,7 +26,7 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
* that contain (row id, column id, ...). The IDs are user specified strings which will be
* preserved in the
* output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
- * will be used to calculate row-wise self-similarity, or when using filters, will generate two
+ * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
* matrices and calculate both the self similarity of the primary matrix and the row-wise
* similarity of the primary
* to the secondary. Returns one or two directories of text files formatted as specified in
@@ -35,14 +36,21 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
* To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
* tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
* you can specify only the input and output file and directory--all else will default to the correct values.
- * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check
+ * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option.
*/
object ItemSimilarityDriver extends MahoutDriver {
- //todo: Should also take two input streams and do cross similarity with no filter required.
- // required for examples
+ // define only the options specific to ItemSimilarity
+ private final val ItemSimilarityOptions = HashMap[String, Any](
+ "maxPrefs" -> 500,
+ "maxSimilaritiesPerItem" -> 100,
+ "appName" -> "ItemSimilarityDriver")
+
+ // build options from some stardard CLI param groups
+ // Note: always put the driver specific options at the last so the can override and previous options!
+ private var options: Map[String, Any] = null
- private var options: Options = _
private var reader1: TextDelimitedIndexedDatasetReader = _
private var reader2: TextDelimitedIndexedDatasetReader = _
private var writer: TextDelimitedIndexedDatasetWriter = _
@@ -52,190 +60,103 @@ object ItemSimilarityDriver extends MahoutDriver {
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
- val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") {
+ options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++
+ MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++
+ MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions
+
+ val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
//Input output options, non-driver specific
- note("Input, output options")
- opt[String]('i', "input") required() action { (x, options) =>
- options.copy(input = x)
- } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
- opt[String]('o', "output") required() action { (x, options) =>
- if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally?
- options.copy(output = x)
- else
- options.copy(output = x + "/")
- } text ("Path for output, any local or HDFS supported URI (required).")
+ parseIOOptions
//Algorithm control options--driver specific
note("\nAlgorithm control options:")
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options.copy(master = x)
- }
-
opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
- options.copy(maxPrefs = x)
- } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x =>
+ options + ("maxPrefs" -> x)
+ } text ("Max number of preferences to consider per user (optional). Default: " +
+ ItemSimilarityOptions("maxPrefs")) validate { x =>
if (x > 0) success else failure("Option --maxPrefs must be > 0")
}
/** not implemented in CooccurrenceAnalysis.cooccurrence
opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
- options.copy(minPrefs = x)
+ options.put("minPrefs", x)
+ options
} text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
if (x > 0) success else failure("Option --minPrefs must be > 0")
}
*/
opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
- options.copy(maxSimilaritiesPerItem = x)
- } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x =>
+ options + ("maxSimilaritiesPerItem" -> x)
+ } text ("Limit the number of similarities per item to this number (optional). Default: " +
+ ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
}
- opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
- options.copy(randomSeed = x)
- } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x =>
- if (x > 0) success else failure("Option --randomSeed must be > 0")
- }
-
- //Input text file schema--not driver specific but input data specific, tuples input,
- // not drms
- note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
- options.copy(inDelim = x)
- }
-
- opt[String]("filter1") abbr ("f1") action { (x, options) =>
- options.copy(filter1 = x)
- } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
- opt[String]("filter2") abbr ("f2") action { (x, options) =>
- options.copy(filter2 = x)
- } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.")
-
- opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
- options.copy(rowIDPosition = x)
- } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
- if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
- }
-
- opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
- options.copy(itemIDPosition = x)
- } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
- if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
- }
-
- opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
- options.copy(filterPosition = x)
- } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
- if (x >= -1) success else failure("Option --filterColNum must be >= -1")
- }
-
- note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+ //Driver notes--driver specific
+ note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
- //File finding strategy--not driver specific
- note("\nFile discovery options:")
- opt[Unit]('r', "recursive") action { (_, options) =>
- options.copy(recursive = true)
- } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+ //Input text format
+ parseInputSchemaOptions
- opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
- options.copy(filenamePattern = x)
- } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+ //How to search for input
+ parseFileDiscoveryOptions
//Drm output schema--not driver specific, drm specific
- note("\nOutput text file schema options:")
- opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
- options.copy(rowKeyDelim = x)
- } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
- opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
- options.copy(columnIdStrengthDelim = x)
- } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
- opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
- options.copy(tupleDelim = x)
- } text ("Separates vector tuple values in the values list (optional). Default: \",\"")
-
- opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
- options.copy(omitStrength = true)
- } text ("Do not write the strength to the output files (optional), Default: false.")
- note("This option is used to output indexable data for creating a search engine recommender.")
+ parseDrmFormatOptions
//Spark config options--not driver specific
- note("\nSpark config options:")
- opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
- options.copy(sparkExecutorMem = x)
- } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g")
+ parseSparkOptions
- note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"")
-
- //Jar inclusion, this option can be set when executing the driver from compiled code
- opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
- options.copy(dontAddMahoutJars = true) //set the value MahoutDriver so the context will be created correctly
- }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
-
- //Driver notes--driver specific
- note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n")
+ //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
+ parseGenericOptions
help("help") abbr ("h") text ("prints this usage text\n")
- checkConfig { c =>
- if (c.filterPosition == c.itemIDPosition
- || c.filterPosition == c.rowIDPosition
- || c.rowIDPosition == c.itemIDPosition)
- failure("The row, item, and filter positions must be unique.") else success
- }
-
- //check for option consistency, probably driver specific
- checkConfig { c =>
- if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" +
- " using filters they must be unique.") else success
- }
-
}
-
- //repeated code, should this be put base MahoutDriver somehow?
- parser.parse(args, Options()) map { opts =>
+ parser.parse(args, options) map { opts =>
options = opts
process
}
-
}
- override def start(masterUrl: String = options.master,
- appName: String = options.appName, dontAddMahoutJars: Boolean = options.dontAddMahoutJars):
+ override def start(masterUrl: String = options("master").asInstanceOf[String],
+ appName: String = options("appName").asInstanceOf[String],
+ dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]):
Unit = {
+ // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
+ // MahoutKryoRegistrator, it should be added to the register list here so it
+ // will be only spcific to this job.
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
- .set("spark.executor.memory", options.sparkExecutorMem)
+ .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
super.start(masterUrl, appName, dontAddMahoutJars)
- val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1,
- "rowIDPosition" -> options.rowIDPosition,
- "columnIDPosition" -> options.itemIDPosition,
- "filterPosition" -> options.filterPosition)
+ val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
+ "filter" -> options("filter1").asInstanceOf[String],
+ "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int],
+ "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int],
+ "filterPosition" -> options("filterPosition").asInstanceOf[Int])
reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
- if (options.filterPosition != -1 && options.filter2 != null) {
- val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2,
- "rowIDPosition" -> options.rowIDPosition,
- "columnIDPosition" -> options.itemIDPosition,
- "filterPosition" -> options.filterPosition)
+ if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null)
+ || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){
+ // only need to change the filter used compared to readSchema1
+ val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String])
reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
}
writeSchema = new Schema(
- "rowKeyDelim" -> options.rowKeyDelim,
- "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
- "omitScore" -> options.omitStrength,
- "tupleDelim" -> options.tupleDelim)
+ "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String],
+ "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String],
+ "omitScore" -> options("omitStrength").asInstanceOf[Boolean],
+ "tupleDelim" -> options("tupleDelim").asInstanceOf[String])
writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
@@ -243,24 +164,60 @@ object ItemSimilarityDriver extends MahoutDriver {
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris
+ val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+ options("recursive").asInstanceOf[Boolean]).uris
+ val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) ""
+ else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+ options("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
Array()
} else {
- val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles))
+ val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles))
+ if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA,
+ options("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+
+ // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+ // Here we assume there is one row ID space for all interactions. To do this we calculate the
+ // row cardinality only after reading in A and B (or potentially C...) We then adjust the
+ // cardinality so all match, which is required for the math to work.
+ // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
+ // be supported (and are at least on Spark) or the row cardinality fix will not work.
+ val datasetB = if (!inFiles2.isEmpty) {
+ // get cross-cooccurrence interactions from separate files
+ val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+
+ datasetB
+
+ } else if (options("filterPosition").asInstanceOf[Int] != -1
+ && options("filter2").asInstanceOf[String] != null) {
+
+ // get cross-cooccurrences interactions by using two filters on a single set of files
+ val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+
+ datasetB
- if (options.filterPosition != -1 && options.filter2 != null) {
- // todo: needs to support more than one cross-similarity indicator
- val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles))
- Array(selfSimilarityDataset, crossSimilarityDataset1)
} else {
- Array(selfSimilarityDataset)
+ null.asInstanceOf[IndexedDataset]
}
+ if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
+ // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
+ val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
- }
+ // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on
+ // its calculation
+ val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality)
+ else datasetA // this guarantees matching cardinality
+ val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
+ else datasetB // this guarantees matching cardinality
+
+ if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions")
+
+ Array(returnedA, returnedB)
+ } else Array(datasetA)
+ }
}
override def process: Unit = {
@@ -271,57 +228,29 @@ object ItemSimilarityDriver extends MahoutDriver {
// todo: allow more than one cross-similarity matrix?
val indicatorMatrices = {
if (indexedDatasets.length > 1) {
- CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix))
+ CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+ options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int],
+ Array(indexedDatasets(1).matrix))
} else {
- CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs)
+ CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+ options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int])
}
}
- // self similarity
- // the next two lines write the drm using a Writer class
- // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs)
- // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix")
-
// an alternative is to create a version of IndexedDataset that knows how to write itself
val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
indexedDatasets(0).columnIDs, writeSchema)
- selfIndicatorDataset.writeTo(options.output + "indicator-matrix")
+ selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix")
- // todo: needs to support more than one cross-similarity indicator
+ // todo: would be nice to support more than one cross-similarity indicator
if (indexedDatasets.length > 1) {
val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
- writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix")
+ writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix")
}
stop
}
- // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option
- // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs
- case class Options(
- master: String = "local",
- sparkExecutorMem: String = "2g",
- appName: String = "ItemSimilarityJob",
- randomSeed: Int = System.currentTimeMillis().toInt,
- recursive: Boolean = false,
- input: String = null,
- output: String = null,
- filenamePattern: String = "^part-.*",
- maxSimilaritiesPerItem: Int = 100,
- maxPrefs: Int = 500,
- minPrefs: Int = 1,
- rowIDPosition: Int = 0,
- itemIDPosition: Int = 1,
- filterPosition: Int = -1,
- filter1: String = null,
- filter2: String = null,
- inDelim: String = "[,\t ]",
- rowKeyDelim: String = "\t",
- columnIdStrengthDelim: String = ":",
- tupleDelim: String = ",",
- omitStrength: Boolean = false,
- dontAddMahoutJars: Boolean = false)
-
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 0c579d4..796a66a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -21,17 +21,26 @@ import org.apache.mahout.math.drm.DistributedContext
import org.apache.spark.SparkConf
import org.apache.mahout.sparkbindings._
+import scala.collection.immutable
+
/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
- * Also define a command line parser and default options or fill in the following template:
+ * Also define a Map of options for the command line parser. The following template may help:
* {{{
* object SomeDriver extends MahoutDriver {
+ * // build options from some stardard CLI param groups
+ * // Note: always put the driver specific options at the last so the can override and previous options!
+ * private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++
+ * TextDelimitedDRMOptions ++ ItemSimilarityOptions
+ *
* override def main(args: Array[String]): Unit = {
- * val parser = new MahoutOptionParser[Options]("Job Name") {
- * head("Job Name", "Spark")
- * note("Various CLI options")
- * //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends
+ * val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+ * head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+ *
+ * //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods
+ * parseGenericOptions
+ * ...
* }
- * parser.parse(args, Options()) map { opts =>
+ * parser.parse(args, options) map { opts =>
* options = opts
* process
* }
@@ -42,15 +51,12 @@ import org.apache.mahout.sparkbindings._
* //don't just stand there do something
* stop
* }
- *
- * //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option
- * case class Options(
- * appName: String = "Job Name", ...
- * )
* }
* }}}
*/
abstract class MahoutDriver {
+
+
implicit var mc: DistributedContext = _
implicit val sparkConf = new SparkConf()
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 8a337f5..ba4ca1d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -17,8 +17,189 @@
package org.apache.mahout.drivers
import scopt.OptionParser
+import scala.collection.immutable
-/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */
-class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) {
+/** Companion object defines default option groups for reference in any driver that needs them */
+object MahoutOptionParser {
+ // set up the various default option groups
+ final val GenericOptions = immutable.HashMap[String, Any](
+ "randomSeed" -> System.currentTimeMillis().toInt,
+ "dontAddMahoutJars" -> false,
+ "writeAllDatasets" -> false)
+
+ final val SparkOptions = immutable.HashMap[String, Any](
+ "master" -> "local",
+ "sparkExecutorMem" -> "2g",
+ "appName" -> "Generic Spark App, Change this.")
+
+ final val FileIOOptions = immutable.HashMap[String, Any](
+ "recursive" -> false,
+ "input" -> null.asInstanceOf[String],
+ "input2" -> null.asInstanceOf[String],
+ "output" -> null.asInstanceOf[String],
+ "filenamePattern" -> "^part-.*")
+
+ final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any](
+ "rowIDPosition" -> 0,
+ "itemIDPosition" -> 1,
+ "filterPosition" -> -1,
+ "filter1" -> null.asInstanceOf[String],
+ "filter2" -> null.asInstanceOf[String],
+ "inDelim" -> "[,\t ]")
+
+ final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "tupleDelim" -> " ",
+ "omitStrength" -> false)
+}
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
override def showUsageOnError = true
+
+ def parseIOOptions = {
+ note("Input, output options")
+ opt[String]('i', "input") required() action { (x, options) =>
+ options + ("input" -> x)
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+ opt[String]("input2") abbr ("i2") action { (x, options) =>
+ options + ("input2" -> x)
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+
+ opt[String]('o', "output") required() action { (x, options) =>
+ // todo: check to see if HDFS allows MS-Windows backslashes locally?
+ if (x.endsWith("/")) {
+ options + ("output" -> x)
+ } else {
+ options + ("output" -> (x + "/"))
+ }
+ } text ("Path for output, any local or HDFS supported URI (required)")
+
+ }
+
+ def parseSparkOptions = {
+ note("\nSpark config options:")
+
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
+ }
+
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
+ }
+
+ }
+
+ def parseGenericOptions = {
+ note("\nGeneral config options:")
+ opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+ options + ("randomSeed" -> x)
+ } validate { x =>
+ if (x > 0) success else failure("Option --randomSeed must be > 0")
+ }
+
+ opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
+ options + ("dontAddMahoutJars" -> true)
+ }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
+
+ //output both input DRMs
+ opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+ options + ("writeAllDatasets" -> true)
+ }//Hidden option, though a user might want this.
+ }
+
+ def parseInputSchemaOptions{
+ //Input text file schema--not driver specific but input data specific, tuples input,
+ // not drms
+ note("\nInput text file schema options:")
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+ options + ("inDelim" -> x)
+ }
+
+ opt[String]("filter1") abbr ("f1") action { (x, options) =>
+ options + ("filter1" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+ opt[String]("filter2") abbr ("f2") action { (x, options) =>
+ options + ("filter2" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+ opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+ options + ("rowIDPosition" -> x)
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ }
+
+ opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+ options + ("itemIDPosition" -> x)
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ }
+
+ opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+ options + ("filterPosition" -> x)
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+ }
+
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+ checkConfig { options: Map[String, Any] =>
+ if (options("filterPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int]
+ || options("filterPosition").asInstanceOf[Int] == options("rowIDPosition").asInstanceOf[Int]
+ || options("rowIDPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int])
+ failure("The row, item, and filter positions must be unique.") else success
+ }
+
+ //check for option consistency, probably driver specific
+ checkConfig { options: Map[String, Any] =>
+ if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+ failure ("If using filters they must be unique.") else success
+ }
+
+ }
+
+ def parseFileDiscoveryOptions = {
+ //File finding strategy--not driver specific
+ note("\nFile discovery options:")
+ opt[Unit]('r', "recursive") action { (_, options) =>
+ options + ("recursive" -> true)
+ } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+ opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+ options + ("filenamePattern" -> x)
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+ }
+
+ def parseDrmFormatOptions = {
+ note("\nOutput text file schema options:")
+ opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+ options + ("rowKeyDelim" -> x)
+ } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+ opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+ options + ("columnIdStrengthDelim" -> x)
+ } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+ opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
+ options + ("tupleDelim" -> x)
+ } text ("Separates vector tuple values in the values list (optional). Default: \" \"")
+
+ opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+ options + ("omitStrength" -> true)
+ } text ("Do not write the strength to the output files (optional), Default: false.")
+ note("This option is used to output indexable data for creating a search engine recommender.")
+
+ note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+ }
+
}
+
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
index c5b7385..e2bb49c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -17,25 +17,39 @@
package org.apache.mahout.drivers
+import com.google.common.collect.{HashBiMap, BiMap}
import org.apache.mahout.math.drm.DistributedContext
-/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read.
- * @tparam T type of object read, usually supplied by an extending trait.
- * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created.
+/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read.
+ * @tparam T type of object read.
*/
trait Reader[T]{
+
val mc: DistributedContext
val readSchema: Schema
- protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T
- def readFrom(source: String): T = reader(mc, readSchema, source)
+
+ protected def tupleReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ def readTuplesFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ tupleReader(mc, readSchema, source, existingRowIDs)
}
/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
- * @tparam T
+ * @tparam T type of object to write.
*/
trait Writer[T]{
+
val mc: DistributedContext
+ val sort: Boolean
val writeSchema: Schema
- protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit
- def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection)
+
+ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+ def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index 7735b83..edff92d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -20,21 +20,30 @@ package org.apache.mahout.drivers
import scala.collection.mutable
import scala.collection.mutable.HashMap
-/** Syntactic sugar for HashMap[String, Any]
+/** Syntactic sugar for mutable.HashMap[String, Any]
*
* @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
*/
class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
// note: this require a mutable HashMap, do we care?
this ++= params
- if (!this.contains("omitScore")) this += ("omitScore" -> false)
+
+ /** Constructor for copying an existing Schema
+ *
+ * @param schemaToClone return a copy of this Schema
+ */
+ def this(schemaToClone: Schema){
+ this()
+ this ++= schemaToClone
+ }
}
-// These can be used to keep the text in and out fairly standard to Mahout, where an application specific format is not
-// required.
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required.
/** Simple default Schema for typical text delimited tuple file input
- * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+ * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
*/
class DefaultTupleReadSchema extends Schema(
"delim" -> "[,\t ]", //comma, tab or space
@@ -43,44 +52,47 @@ class DefaultTupleReadSchema extends Schema(
"columnIDPosition" -> 1,
"filterPosition" -> -1)
-/** Simple default Schema for typical text delimited drm file output
- * This tells the writer to write a DRM of the default
- * (rowID<tab>columnID1:score1,columnID2:score2,...)
+/** Default Schema for text delimited drm file output
+ * This tells the writer to write a DRM of the default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
*/
class DefaultDRMWriteSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> ",")
+ "tupleDelim" -> " ",
+ "omitScore" -> false)
-/** Simple default Schema for typical text delimited drm file output
- * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+/** Default Schema for typical text delimited drm file input
+ * This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
*/
class DefaultDRMReadSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> ",")
+ "tupleDelim" -> " ")
-/** Simple default Schema for reading a text delimited drm file where the score of any tuple is ignored,
+/** Default Schema for reading a text delimited drm file where the score of any tuple is ignored,
* all non-zeros are replaced with 1.
* This tells the reader to input DRM lines of the form
- * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is ignored. Alternatively the format can be
- * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. This is the default output format for
- * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
*/
class DRMReadBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> ",",
+ "tupleDelim" -> " ",
"omitScore" -> true)
-/** Simple default Schema for typical text delimited drm file write where the score of a tuple is omitted.
+/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted.
* The presence of a tuple means the score = 1, the absence means a score of 0.
- * This tells the reader to input DRM lines of the form
- * (rowID<tab>columnID1,columnID2,...)
+ * This tells the writer to output DRM lines of the form
+ * (rowID<tab>columnID1<space>columnID2...)
*/
class DRMWriteBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
- "tupleDelim" -> ",",
+ "tupleDelim" -> " ",
"omitScore" -> true)
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index ae78d59..11d647b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,14 +17,12 @@
package org.apache.mahout.drivers
-import scala.collection.JavaConversions._
import org.apache.spark.SparkContext._
import org.apache.mahout.math.RandomAccessSparseVector
import com.google.common.collect.{BiMap, HashBiMap}
-import scala.collection.JavaConversions._
import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
import org.apache.mahout.sparkbindings._
-
+import scala.collection.JavaConversions._
/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
*/
@@ -36,7 +34,11 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
* @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
* @return
*/
- protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = {
+ protected def tupleReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
try {
val delimiter = readSchema("delim").asInstanceOf[String]
val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
@@ -51,6 +53,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
})
var columns = mc.textFile(source).map { line => line.split(delimiter) }
+ //val m = columns.collect
// -1 means no filter in the input text, take them all
if(filterPosition != -1) {
@@ -59,7 +62,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
}
// get row and column IDs
- val m = columns.collect
+ //val m = columns.collect
val interactions = columns.map { tokens =>
tokens(rowIDPosition) -> tokens(columnIDPosition)
}
@@ -75,10 +78,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
// create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
// broadcast them for access in distributed processes, so they are not recalculated in every task.
- val rowIDDictionary = asOrderedDictionary(rowIDs)
+ val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
- val columnIDDictionary = asOrderedDictionary(columnIDs)
+ val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
val indexedInteractions =
@@ -113,11 +116,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
// this creates a BiMap from an ID collection. The ID points to an ordinal int
// which is used internal to Mahout as the row or column ID
// todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
- private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = {
- var dictionary: BiMap[String, Int] = HashBiMap.create()
- var index = 0
+ private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+ var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
for (entry <- entries) {
- dictionary.forcePut(entry, index)
+ if (!dictionary.contains(entry)) dictionary.put(entry, index)
index += 1
}
dictionary
@@ -125,13 +127,21 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
}
trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+
+ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
/** Read in text delimited tuples from all URIs in this comma delimited source String.
*
* @param mc context for the Spark job
* @param writeSchema describes the delimiters and positions of values in the output text delimited file.
* @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
*/
- protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = {
+ protected def writer(
+ mc: DistributedContext,
+ writeSchema: Schema,
+ dest: String,
+ indexedDataset: IndexedDataset,
+ sort: Boolean = true): Unit = {
try {
val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -140,8 +150,14 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
//instance vars must be put into locally scoped vals when put into closures that are
//executed but Spark
- assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException })
- assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException})
+ assert(indexedDataset != null, {
+ println(this.getClass.toString + ": has no indexedDataset to write")
+ throw new IllegalArgumentException
+ })
+ assert(!dest.isEmpty, {
+ println(this.getClass.toString + ": has no destination or indextedDataset to write")
+ throw new IllegalArgumentException
+ })
val matrix = indexedDataset.matrix
val rowIDDictionary = indexedDataset.rowIDs
@@ -149,18 +165,29 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
matrix.rdd.map { case (rowID, itemVector) =>
- // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens
- // first get the external rowID token
- var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
-
- // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
- for (item <- itemVector.nonZeroes()) {
- line += columnIDDictionary.inverse.get(item.index)
- if (!omitScore) line += columnIdStrengthDelim + item.get
- line += tupleDelim
+ // turn non-zeros into list for sorting
+ val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]]
+ for (ve <- itemVector.nonZeroes) {
+ val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
+ itemList += item
}
- // drop the last delimiter, not needed to end the line
- line.dropRight(1)
+ //sort by highest value descending(-)
+ val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+
+ // first get the external rowID token
+ if (!vector.isEmpty){
+ var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+ // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+ for (item <- vector) {
+ line += columnIDDictionary.inverse.get(item.getFirst)
+ if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+ line += tupleDelim
+ }
+ // drop the last delimiter, not needed to end the line
+ line.dropRight(1)
+ } else {//no items so write a line with id but no values, no delimiters
+ rowIDDictionary.inverse.get(rowID)
+ } // "if" returns a line of text so this must be last in the block
}
.saveAsTextFile(dest)
@@ -176,25 +203,28 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param readSchema describes the delimiters and position of values in the text delimited file to be read.
* @param mc Spark context for reading files
- * @note The source is supplied by Reader#readFrom .
+ * @note The source is supplied by Reader#readTuplesFrom .
* */
-class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
+ (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
* @param mc Spark context for reading files
- * @note the destination is supplied by Writer#writeTo trait method
+ * @note the destination is supplied by Writer#writeDRMTo trait method
* */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
* @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
* @param mc Spark context for reading the files, may be implicitly defined.
* */
-class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true)
+ (implicit val mc: DistributedContext)
+ extends TDIndexedDatasetReaderWriter
-/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
+/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating
* a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
* Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
* are probably short lived in terms of lines of code so complexity may be moot.
@@ -204,12 +234,17 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
* @param writeSchema contains params for the schema/format or the written text delimited file.
* @param mc mahout distributed context (DistributedContext) may be implicitly defined.
* */
-class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int],
- val writeSchema: Schema)(implicit val mc: DistributedContext)
+class IndexedDatasetTextDelimitedWriteable(
+ matrix: CheckpointedDrm[Int],
+ rowIDs: BiMap[String,Int],
+ columnIDs: BiMap[String,Int],
+ val writeSchema: Schema,
+ val sort: Boolean = true)
+ (implicit val mc: DistributedContext)
extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
def writeTo(dest: String): Unit = {
- writeTo(this, dest)
+ writeDRMTo(this, dest)
}
}
@@ -217,11 +252,11 @@ class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs:
* Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
* making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
* {{{
- * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source))
+ * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source))
* }}}
*/
object IndexedDatasetTextDelimitedWriteable {
/** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
- def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc)
+ def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 1c5546b..cc5ebf2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -65,7 +65,6 @@ class CheckpointedDrmSpark[K: ClassTag](
private var cached: Boolean = false
override val context: DistributedContext = rdd.context
-
/**
* Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
* and writing down Spark graph lineage since last checkpointed DRM.
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 22e31cc..61f37e4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -23,6 +23,10 @@ import com.google.common.collect.HashBiMap
import org.apache.mahout.math._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.Pair
+import org.apache.mahout.math.Vector.Element
+
+import scala.collection.immutable.List
/** Kryo serialization registrator for Mahout */
class MahoutKryoRegistrator extends KryoRegistrator {
@@ -32,6 +36,6 @@ class MahoutKryoRegistrator extends KryoRegistrator {
kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable])
kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable])
kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable])
- kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer());
+ kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer())
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 938dc33..642e90a 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,6 +17,7 @@
package org.apache.mahout.cf
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.{MatrixOps, _}
import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
@@ -48,13 +49,19 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
(0.0, 0.0, 0.0, 0.0, 0.0))
// correct cross-cooccurrence with LLR
- final val matrixLLRCoocBtAControl = dense(
+ final val m = dense(
(1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
(1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
(1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
(1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
(0.0, 0.0, 0.0, 0.0, 4.498681156950466))
+ final val matrixLLRCoocBtAControl = dense(
+ (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+ (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+ (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466))
test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
@@ -150,6 +157,46 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
n should be < 1E-10
}
+ test("cooccurrence two matrices with different number of columns"){
+ val a = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val b = dense(
+ (0, 1, 1, 0),
+ (1, 1, 1, 0),
+ (0, 0, 1, 0),
+ (1, 1, 0, 1))
+
+ val matrixLLRCoocBtANonSymmetric = dense(
+ (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+ (0.0, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+ (0.0, 0.0, 0.6795961471815897, 0.0))
+
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ //self similarity
+ val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+
+ //cooccurrence without LLR is just a A'B
+ //val inCoreAtB = a.transpose().times(b)
+ //val bp = 0
+ }
+
test("LLR calc") {
val A = dense(
(1, 1, 0, 0, 0),