You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/03/04 22:52:32 UTC
[6/7] mahout git commit: removed o.a.m.Pair,
cleaned up comments and style issues, simplified driver API,
merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in
v1.2.1
removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/15ee1951
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/15ee1951
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/15ee1951
Branch: refs/heads/master
Commit: 15ee1951d1ba8d8ee7d24e2510af187fd984c8cf
Parents: 43bea68
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:38:05 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:38:05 2015 -0800
----------------------------------------------------------------------
examples/bin/get-all-examples.sh | 4 +-
math-scala/pom.xml | 25 +++-
.../apache/mahout/drivers/MahoutDriver.scala | 11 +-
.../mahout/drivers/MahoutOptionParser.scala | 78 ++++++-----
.../mahout/math/cf/SimilarityAnalysis.scala | 104 +++++++--------
.../org/apache/mahout/math/drm/package.scala | 4 +-
.../math/indexeddataset/IndexedDataset.scala | 23 ++--
.../math/indexeddataset/ReaderWriter.scala | 71 ++++++++--
.../mahout/math/indexeddataset/Schema.scala | 82 ++++++------
spark/pom.xml | 6 +-
spark/src/main/assembly/dependency-reduced.xml | 44 +++++++
spark/src/main/assembly/job.xml | 61 ---------
.../apache/mahout/common/HDFSPathSearch.scala | 17 ++-
.../mahout/drivers/ItemSimilarityDriver.scala | 78 ++++++-----
.../mahout/drivers/MahoutSparkDriver.scala | 106 ++++++++-------
.../drivers/MahoutSparkOptionParser.scala | 16 ++-
.../mahout/drivers/RowSimilarityDriver.scala | 43 +++---
.../apache/mahout/drivers/TestNBDriver.scala | 58 ++++----
.../drivers/TextDelimitedReaderWriter.scala | 132 ++++++++++---------
.../apache/mahout/drivers/TrainNBDriver.scala | 16 +--
.../mahout/sparkbindings/SparkEngine.scala | 10 +-
.../indexeddataset/IndexedDatasetSpark.scala | 17 ++-
.../apache/mahout/sparkbindings/package.scala | 6 +-
.../drivers/ItemSimilarityDriverSuite.scala | 117 ++++++++--------
.../test/DistributedSparkSuite.scala | 4 +
25 files changed, 619 insertions(+), 514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/examples/bin/get-all-examples.sh
----------------------------------------------------------------------
diff --git a/examples/bin/get-all-examples.sh b/examples/bin/get-all-examples.sh
index a24c7fd..4128e47 100755
--- a/examples/bin/get-all-examples.sh
+++ b/examples/bin/get-all-examples.sh
@@ -26,8 +26,8 @@ echo " Solr-recommender example: "
echo " 1) imports text 'log files' of some delimited form for user preferences"
echo " 2) creates the correct Mahout files and stores distionaries to translate external Id to and from Mahout Ids"
echo " 3) it implements a prototype two actions 'cross-recommender', which takes two actions made by the same user and creates recommendations"
-echo " 4) it creates output for user->preference history CSV and and item->similar items 'indicator' matrix for use in a Solr-recommender."
-echo " To use Solr you would index the indicator matrix CSV, and use user preference history from the history CSV as a query, the result"
+echo " 4) it creates output for user->preference history CSV and and item->similar items 'similarity' matrix for use in a Solr-recommender."
+echo " To use Solr you would index the similarity matrix CSV, and use user preference history from the history CSV as a query, the result"
echo " from Solr will be an ordered list of recommendations returning the same item Ids as were input."
echo " For further description see the README.md here https://github.com/pferrel/solr-recommender"
echo " To build run 'cd solr-recommender; mvn install'"
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 66309d6..50cea7a 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -175,17 +175,36 @@
<dependency>
<groupId>com.github.scopt</groupId>
- <artifactId>scopt_2.10</artifactId>
- <version>3.2.0</version>
+ <artifactId>scopt_${scala.major}</artifactId>
+ <version>3.3.0</version>
</dependency>
<!-- scala stuff -->
<dependency>
<groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-actors</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.major}</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 3d9d4e1..32515f1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -19,23 +19,24 @@ package org.apache.mahout.drivers
import org.apache.mahout.math.drm.DistributedContext
-/** Extended by a platform specific version of this class to create a Mahout CLI driver.
- */
+/** Extended by a platform specific version of this class to create a Mahout CLI driver. */
abstract class MahoutDriver {
-
implicit protected var mc: DistributedContext = _
implicit protected var parser: MahoutOptionParser = _
var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+ /** must be overriden to setup the DistributedContext mc*/
+ protected def start() : Unit
+
/** Override (optionally) for special cleanup */
- protected def stop: Unit = {
+ protected def stop(): Unit = {
if (!_useExistingContext) mc.close
}
/** This is where you do the work, call start first, then before exiting call stop */
- protected def process: Unit
+ protected def process(): Unit
/** Parse command line and call process */
def main(args: Array[String]): Unit
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 479a8d0..3b5affd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -20,16 +20,17 @@ import scopt.OptionParser
import scala.collection.immutable
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
- * keep both standarized.
- * @param programName Name displayed in help message, the name by which the driver is invoked.
- * @note options are engine neutral by convention. See the engine specific extending class for
- * to add Spark or other engine options.
- * */
+/**
+ * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ * to add Spark or other engine options.
+ */
class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
// build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so they can override and previous options!
+ // Note: always put the driver specific options at the last so they can override any previous options!
var opts = Map.empty[String, Any]
override def showUsageOnError = true
@@ -39,12 +40,14 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
note("Input, output options")
opt[String]('i', "input") required() action { (x, options) =>
options + ("input" -> x)
- } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" +
+ " (required)")
if (numInputs == 2) {
opt[String]("input2") abbr ("i2") action { (x, options) =>
options + ("input2" -> x)
- } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " +
+ "(optional). Default: empty.")
}
opt[String]('o', "output") required() action { (x, options) =>
@@ -53,11 +56,11 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
} else {
options + ("output" -> (x + "/"))
}
- } text ("Path for output, any local or HDFS supported URI (required)")
+ } text ("Path for output directory, any HDFS supported URI (required)")
}
- def parseGenericOptions = {
+ def parseGenericOptions() = {
opts = opts ++ MahoutOptionParser.GenericOptions
opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
options + ("randomSeed" -> x)
@@ -65,48 +68,55 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
if (x > 0) success else failure("Option --randomSeed must be > 0")
}
- //output both input DRMs
+ //output both input IndexedDatasets
opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
options + ("writeAllDatasets" -> true)
}//Hidden option, though a user might want this.
}
- def parseElementInputSchemaOptions{
+ def parseElementInputSchemaOptions() = {
//Input text file schema--not driver specific but input data specific, elements input,
- // not drms
+ // not rows of IndexedDatasets
opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
- options + ("inDelim" -> x)
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action {
+ (x, options) =>
+ options + ("inDelim" -> x)
}
opt[String]("filter1") abbr ("f1") action { (x, options) =>
options + ("filter1" -> x)
- } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " +
+ "Default: no filter, all data is used")
opt[String]("filter2") abbr ("f2") action { (x, options) =>
options + ("filter2" -> x)
- } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " +
+ "If not present no secondary dataset is collected")
opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
options + ("rowIDColumn" -> x)
- } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
- if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate {
+ x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
}
opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
options + ("itemIDColumn" -> x)
- } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
- if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate {
+ x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
}
opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
options + ("filterColumn" -> x)
- } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " +
+ "filter") validate { x =>
if (x >= -1) success else failure("Option --filterColNum must be >= -1")
}
- note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" +
+ " \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
//check for column consistency
checkConfig { options: Map[String, Any] =>
@@ -126,7 +136,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
- def parseFileDiscoveryOptions = {
+ def parseFileDiscoveryOptions() = {
//File finding strategy--not driver specific
opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
note("\nFile discovery options:")
@@ -136,12 +146,13 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
options + ("filenamePattern" -> x)
- } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option " +
+ "or \"^part-.*\" if --input is a directory")
}
- def parseDrmFormatOptions = {
- opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+ def parseIndexedDatasetFormatOptions() = {
+ opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
note("\nOutput text file schema options:")
opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
options + ("rowKeyDelim" -> x)
@@ -160,13 +171,16 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
} text ("Do not write the strength to the output files (optional), Default: false.")
note("This option is used to output indexable data for creating a search engine recommender.")
- note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+ note("\nDefault delimiters will produce output of the form: " +
+ "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
}
}
-/** Companion object defines default option groups for reference in any driver that needs them.
- * @note not all options are platform neutral so other platforms can add default options here if desired */
+/**
+ * Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired
+ */
object MahoutOptionParser {
// set up the various default option groups
@@ -196,7 +210,7 @@ object MahoutOptionParser {
"filter2" -> null.asInstanceOf[String],
"inDelim" -> "[,\t ]")
- final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+ final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any](
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index e1766e8..6557ab0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -32,7 +32,7 @@ import scala.util.Random
/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
* available at http://www.mapr.com/practical-machine-learning
*
* see also "Sebastian Schelter, Christoph Boden, Volker Markl:
@@ -44,14 +44,16 @@ object SimilarityAnalysis extends Serializable {
/** Compares (Int,Double) pairs by the second value */
private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
- /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
- * and returns a list of indicator and cross-indicator matrices
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @return
- */
+ /**
+ * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+ * and returns a list of similarity and cross-similarity matrices
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
+ * cross-cooccurrence
+ */
def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
@@ -69,11 +71,11 @@ object SimilarityAnalysis extends Serializable {
// Compute co-occurrence matrix A'A
val drmAtA = drmA.t %*% drmA
- // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
- val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+ val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
+ bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
- var indicatorMatrices = List(drmIndicatorsAtA)
+ var similarityMatrices = List(drmSimilarityAtA)
// Now look at cross-co-occurrences
for (drmBRaw <- drmBs) {
@@ -86,10 +88,10 @@ object SimilarityAnalysis extends Serializable {
// Compute cross-co-occurrence matrix A'B
val drmAtB = drmA.t %*% drmB
- val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+ val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
bcastInteractionsPerItemA, bcastInteractionsPerThingB)
- indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+ similarityMatrices = similarityMatrices :+ drmSimilarityAtB
drmB.uncache()
}
@@ -97,19 +99,21 @@ object SimilarityAnalysis extends Serializable {
// Unpin downsampled interaction matrix
drmA.uncache()
- // Return list of indicator matrices
- indicatorMatrices
+ // Return list of similarity matrices
+ similarityMatrices
}
- /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
- * and returns a list of indicator and cross-indicator matrices
- * Somewhat easier to use method, which handles the ID dictionaries correctly
- * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingItemsPerThing max similarities per items
- * @param maxNumInteractions max number of input items per item
- * @return
- */
+ /**
+ * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
+ * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
+ * dictionaries correctly
+ * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingItemsPerThing max similarities per items
+ * @param maxNumInteractions max number of input items per item
+ * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
+ * IndexedDatasets for cooccurrence and cross-cooccurrence
+ */
def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
randomSeed: Int = 0xdeadbeef,
maxInterestingItemsPerThing: Int = 50,
@@ -127,13 +131,13 @@ object SimilarityAnalysis extends Serializable {
retIDSs.toList
}
- /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @return
- */
+ /**
+ * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ */
def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
maxNumInteractions: Int = 500): DrmLike[Int] = {
@@ -152,20 +156,20 @@ object SimilarityAnalysis extends Serializable {
val drmAAt = drmA %*% drmA.t
// Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
- val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
+ val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
+ bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
drmSimilaritiesAAt
}
- /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
- * Uses IndexedDatasets, which handle external ID dictionaries properly
- * @param indexedDataset compare each row to every other
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingSimilaritiesPerRow max elements returned in each row
- * @param maxObservationsPerRow max number of input elements to use
- * @return
- */
+ /**
+ * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+ * Uses IndexedDatasets, which handle external ID dictionaries properly
+ * @param indexedDataset compare each row to every other
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+ * @param maxObservationsPerRow max number of input elements to use
+ */
def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
maxInterestingSimilaritiesPerRow: Int = 50,
maxObservationsPerRow: Int = 500):
@@ -175,10 +179,7 @@ object SimilarityAnalysis extends Serializable {
indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
}
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- */
+ /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
numInteractionsWithAandB: Long, numInteractions: Long) = {
@@ -220,7 +221,7 @@ object SimilarityAnalysis extends Serializable {
val candidate = thingA -> llr
- // matches legacy hadoop code and maps values to range (0..1)
+ // legacy hadoop code maps values to range (0..1) via
// val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
// val candidate = thingA -> normailizedLLR
@@ -253,7 +254,7 @@ object SimilarityAnalysis extends Serializable {
* @param drmM matrix to downsample
* @param seed random number generator seed, keep to a constant if repeatability is neccessary
* @param maxNumInteractions number of elements in a row of the returned matrix
- * @return
+ * @return the downsampled DRM
*/
def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
@@ -269,9 +270,8 @@ object SimilarityAnalysis extends Serializable {
case (keys, block) =>
val numInteractions: Vector = bcastNumInteractions
- // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
- // don't use commons since scala's is included anyway
- // val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+ // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
+ //failures
val random = new Random(MurmurHash.hash(keys(0), seed))
val downsampledBlock = block.like()
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 3afbecb..81f6ab1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -123,13 +123,13 @@ package object indexeddataset {
def indexedDatasetDFSRead(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: BiMap[String, Int] = HashBiMap.create())
- (implicit ctx: DistributedContext):
+ (implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
def indexedDatasetDFSReadElements(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: BiMap[String, Int] = HashBiMap.create())
- (implicit ctx: DistributedContext):
+ (implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
index c7eb2cb..f6811e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -22,15 +22,12 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
import org.apache.mahout.math.indexeddataset
/**
- * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
- * ID/label translation dictionaries.
- * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
- * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
- * used internal to Mahout core code.
- *
- * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
- * to be not created when not needed.
- */
+ * Wrap an [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]]
+ * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout
+ * core code.
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created
+ * when not needed.
+ */
trait IndexedDataset {
val matrix: CheckpointedDrm[Int]
@@ -39,12 +36,13 @@ trait IndexedDataset {
/**
* Write a text delimited file(s) with the row and column IDs from dictionaries.
- * @param dest
- * @param schema
+ * @param dest write location, usually a directory
+ * @param schema params to control writing
+ * @param sc the [[org.apache.mahout.math.drm.DistributedContext]] used to do a distributed write
*/
def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
- /** Factory method, creates the extending class */
+ /** Factory method, creates the extending class and returns a new instance */
def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
IndexedDataset
@@ -52,6 +50,7 @@ trait IndexedDataset {
* Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
* No changes are made to the underlying drm.
* @param n number to use for new row cardinality, should be larger than current
+ * @return a new IndexedDataset or extending class with new cardinality
* @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
* results.
*/
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
index cf429f5..f7653ae 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -20,49 +20,98 @@ package org.apache.mahout.math.indexeddataset
import com.google.common.collect.{BiMap, HashBiMap}
import org.apache.mahout.math.drm.DistributedContext
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
- * which also defines the type to be read.
- * @tparam T type of object to read.
- */
+/**
+ * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an
+ * extending trait, which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
trait Reader[T]{
val mc: DistributedContext
val readSchema: Schema
+ /**
+ * Override in extending trait to supply T and perform a parallel read of collection elements
+ * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+ * @param readSchema map of parameters controlling formating and how the read is executed
+ * @param source list of comma delimited files to read from
+ * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+ * been applied to this collection--used to synchronize row IDs between several
+ * collections
+ * @return a new collection of type T
+ */
protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
existingRowIDs: BiMap[String, Int]): T
- protected def drmReader(
+ /**
+ * Override in extending trait to supply T and perform a parallel read of collection rows
+ * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+ * @param readSchema map of parameters controlling formating and how the read is executed
+ * @param source list of comma delimited files to read from
+ * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+ * been applied to this collection--used to synchronize row IDs between several
+ * collections
+ * @return a new collection of type T
+ */
+ protected def rowReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
existingRowIDs: BiMap[String, Int]): T
+ /**
+ * Public method called to perform the element-wise read. Usually no need to override
+ * @param source comma delimited URIs to read from
+ * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+ * to synchronize all row ids is several collections
+ * @return a new collection of type T
+ */
def readElementsFrom(
source: String,
existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
elementReader(mc, readSchema, source, existingRowIDs)
- def readDRMFrom(
+ /**
+ * Public method called to perform the row-wise read. Usually no need to override.
+ * @param source comma delimited URIs to read from
+ * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+ * to synchronize all row ids is several collections
+ * @return a new collection of type T
+ */
+ def readRowsFrom(
source: String,
existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- drmReader(mc, readSchema, source, existingRowIDs)
+ rowReader(mc, readSchema, source, existingRowIDs)
}
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
- * which also defines the type to be written.
- * @tparam T type of object to write.
- */
+/**
+ * Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write, usually a matrix type thing.
+ */
trait Writer[T]{
val mc: DistributedContext
val sort: Boolean
val writeSchema: Schema
+ /**
+ * Override to provide writer method
+ * @param mc context used to do distributed write
+ * @param writeSchema map with params to control format and execution of the write
+ * @param dest root directory to write to
+ * @param collection usually a matrix like collection to write
+ * @param sort flags whether to sort the rows by value descending
+ */
protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+ /**
+ * Call this method to perform the write, usually no need to override.
+ * @param collection what to write
+ * @param dest root directory to write to
+ */
def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
index 557b419..3b4a2e9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -19,34 +19,33 @@ package org.apache.mahout.math.indexeddataset
import scala.collection.mutable.HashMap
-/** Syntactic sugar for mutable.HashMap[String, Any]
- *
- * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
- */
+/**
+ * Syntactic sugar for mutable.HashMap[String, Any]
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
// note: this require a mutable HashMap, do we care?
this ++= params
- /** Constructor for copying an existing Schema
- *
- * @param schemaToClone return a copy of this Schema
- */
+ /**
+ * Constructor for copying an existing Schema
+ * @param schemaToClone return a copy of this Schema
+ */
def this(schemaToClone: Schema){
this()
this ++= schemaToClone
}
}
-/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
- * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * , which can be used to create a Mahout DRM for DSL ops.
- */
-
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+// which can be used to create a Mahout DRM for DSL ops.
-/** Simple default Schema for typical text delimited element file input
- * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
- * <comma, tab, or space>here may be other ignored text...)
- */
+/**
+ * Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
final object DefaultIndexedDatasetElementReadSchema extends Schema(
"delim" -> "[,\t ]", //comma, tab or space
"filter" -> "",
@@ -54,46 +53,49 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema(
"columnIDPosition" -> 1,
"filterColumn" -> -1)
-/** Default Schema for text delimited drm file output
- * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
- * (rowID<tab>columnID1:score1<space>columnID2:score2...)
- */
+/**
+ * Default Schema for text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output with
+ * one row per line.
+ * The default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
final object DefaultIndexedDatasetWriteSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
"omitScore" -> false)
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
- * This tells the reader to input text lines of the form:
- * (rowID<tab>columnID1:score1,columnID2:score2,...)
- */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file
+ * row-wise input. This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
final object DefaultIndexedDatasetReadSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ")
-/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
- * the score of any element is ignored,
- * all non-zeros are replaced with 1.
- * This tells the reader to input DRM lines of the form
- * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
- * Alternatively the format can be
- * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
- * output format for [[IndexedDatasetWriteBooleanSchema]]
- */
+/**
+ * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
+ * the score of any element is ignored.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
final object IndexedDatasetReadBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
"omitScore" -> true)
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
- * the score of a element is omitted.
- * The presence of a element means the score = 1, the absence means a score of 0.
- * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
- * (rowID<tab>columnID1<space>columnID2...)
- */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output
+ * where the score of a element is omitted. This tells the writer to output
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
final object IndexedDatasetWriteBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index bcf9e30..c7069b6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -164,14 +164,14 @@
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
- <id>job</id>
+ <id>dependency-reduced</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
- <descriptor>../spark/src/main/assembly/job.xml</descriptor>
+ <descriptor>src/main/assembly/dependency-reduced.xml</descriptor>
</descriptors>
</configuration>
</execution>
@@ -317,11 +317,9 @@
<scope>test</scope>
</dependency>
-
<!-- 3rd-party -->
<!-- scala stuff -->
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.major}</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml
new file mode 100644
index 0000000..5dcc945
--- /dev/null
+++ b/spark/src/main/assembly/dependency-reduced.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+ http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>dependency-reduced</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <unpack>true</unpack>
+ <unpackOptions>
+ <!-- MAHOUT-1126 -->
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ </excludes>
+ </unpackOptions>
+ <scope>runtime</scope>
+ <outputDirectory>/</outputDirectory>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <includes>
+ <include>com.google.guava:guava</include>
+ <include>com.github.scopt</include>
+ </includes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
deleted file mode 100644
index 2bdb3ce..0000000
--- a/spark/src/main/assembly/job.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
- http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <id>job</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <unpack>true</unpack>
- <unpackOptions>
- <!-- MAHOUT-1126 -->
- <excludes>
- <exclude>META-INF/LICENSE</exclude>
- </excludes>
- </unpackOptions>
- <scope>runtime</scope>
- <outputDirectory>/</outputDirectory>
- <useTransitiveFiltering>true</useTransitiveFiltering>
- <excludes>
- <exclude>org.apache.hadoop:hadoop-core</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/classes</directory>
- <outputDirectory>/</outputDirectory>
- <excludes>
- <exclude>*.jar</exclude>
- </excludes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/classes</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>driver.classes.default.props</include>
- </includes>
- </fileSet>
- </fileSets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
index 42bf697..0b4130d 100644
--- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
/**
* Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
* in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
* @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
* @param filePattern regex that must match the entire filename to have the file returned
* @param recursive true traverses the filesystem recursively, default = false
@@ -35,8 +34,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
val conf = new Configuration()
val fs = FileSystem.get(conf)
- /** Returns a string of comma delimited URIs matching the filePattern
- * When pattern matching dirs are never returned, only traversed. */
+ /**
+ * Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed.
+ */
def uris: String = {
if (!filePattern.isEmpty){ // have file pattern so
val pathURIs = pathURI.split(",")
@@ -51,8 +52,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
}
}
- /** Find matching files in the dir, recursively call self when another directory is found
- * Only files are matched, directories are traversed but never return a match */
+ /**
+ * Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match
+ */
private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
val seed = fs.getFileStatus(new Path(dir))
var f: String = files
@@ -71,7 +74,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
f = findFiles(fileStatus.getPath.toString, filePattern, f)
}
}
- }else{ f = dir }// was a filename not dir
+ } else { f = dir }// was a filename not dir
f
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 36ba6ef..63da80f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,23 +24,19 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
- * Reads text lines
- * that contain (row id, column id, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
- * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
- * matrices and calculate both the self similarity of the primary matrix and the row-wise
- * similarity of the primary
- * to the secondary. Returns one or two directories of text files formatted as specified in
- * the options.
- * The options allow flexible control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
- * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
- * you can specify only the input and output file and directory--all else will default to the correct values.
- * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the output.
+ * The individual elements will be accumulated into a matrix like
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] and
+ * [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] will be used to calculate row-wise
+ * self-similarity, or when using filters or two inputs, will generate two matrices and calculate both the
+ * self-similarity of the primary matrix and the row-wise similarity of the primary to the secondary. Returns one
+ * or two directories of text files formatted as specified in the options. The options allow flexible control of the
+ * input schema, file discovery, output schema, and control of algorithm parameters. To get help run
+ * {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple elements of text delimited
+ * values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, you can specify
+ * only the input and output file and directory--all else will default to the correct values. Each output line will
+ * contain the Item ID and similar items sorted by LLR strength descending.
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
* option.
@@ -57,6 +53,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
private var readSchema2: Schema = _
/**
+ * Entry point, not using Scala App trait
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
@@ -74,52 +71,51 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
options + ("maxPrefs" -> x)
} text ("Max number of preferences to consider per user (optional). Default: " +
ItemSimilarityOptions("maxPrefs")) validate { x =>
- if (x > 0) success else failure("Option --maxPrefs must be > 0")
+ if (x > 0) success else failure("Option --maxPrefs must be > 0")
}
- /** not implemented in SimilarityAnalysis.cooccurrence
- * threshold, and minPrefs
- * todo: replacing the threshold with some % of the best values and/or a
- * confidence measure expressed in standard deviations would be nice.
- */
+ // not implemented in SimilarityAnalysis.cooccurrence
+ // threshold, and minPrefs
+ // todo: replacing the threshold with some % of the best values and/or a
+ // confidence measure expressed in standard deviations would be nice.
opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
options + ("maxSimilaritiesPerItem" -> x)
} text ("Limit the number of similarities per item to this number (optional). Default: " +
ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
- if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+ if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
}
//Driver notes--driver specific
note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
//Input text format
- parseElementInputSchemaOptions
+ parseElementInputSchemaOptions()
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
//Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ parseIndexedDatasetFormatOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
}
parser.parse(args, parser.opts) map { opts =>
parser.opts = opts
- process
+ process()
}
}
override protected def start() : Unit = {
- super.start
+ super.start()
readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
"filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -139,12 +135,12 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
"columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
"omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
"elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
- }
+ }
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
- parser.opts("recursive").asInstanceOf[Boolean]).uris
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+ parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
@@ -160,8 +156,8 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
// The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
// Here we assume there is one row ID space for all interactions. To do this we calculate the
- // row cardinality only after reading in A and B (or potentially C...) We then adjust the
- // cardinality so all match, which is required for the math to work.
+ // row cardinality only after reading in A and B (or potentially C...) We then adjust the cardinality
+ // so all match, which is required for the math to work.
// Note: this may leave blank rows with no representation in any DRM. Blank rows need to
// be supported (and are at least on Spark) or the row cardinality adjustment will not work.
val datasetB = if (!inFiles2.isEmpty) {
@@ -201,19 +197,19 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
}
}
- override def process: Unit = {
- start
+ override def process(): Unit = {
+ start()
val indexedDatasets = readIndexedDatasets
val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
// todo: allow more than one cross-similarity matrix?
- idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+ idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "similarity-matrix", schema = writeSchema)
if(idss.length > 1)
- idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
+ idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-similarity-matrix", schema = writeSchema)
- stop
+ stop()
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index ab40c3a..668d70c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -21,69 +21,81 @@ import org.apache.mahout.math.drm.DistributedContext
import org.apache.spark.SparkConf
import org.apache.mahout.sparkbindings._
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
- * Also define a Map of options for the command line parser. The following template may help:
- * {{{
- * object SomeDriver extends MahoutDriver {
- *
- * // define only the options specific to this driver, inherit the generic ones
- * private final val SomeOptions = HashMap[String, Any](
- * "maxThings" -> 500,
- * "minThings" -> 100,
- * "appName" -> "SomeDriver")
- *
- * override def main(args: Array[String]): Unit = {
- *
- * val parser = new MahoutOptionParser(programName = "shortname") {
- * head("somedriver", "Mahout 1.0-SNAPSHOT")
- *
- * // Input output options, non-driver specific
- * parseIOOptions
- *
- * // Algorithm specific options
- * // Add in the new options
- * opts = opts ++ SomeOptions
- * note("\nAlgorithm control options:")
- * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
- * options + ("maxThings" -> x) ...
- * }
- * parser.parse(args, parser.opts) map { opts =>
- * parser.opts = opts
- * process
- * }
- * }
- *
- * override def process: Unit = {
- * start // override to change the default Kryo or SparkConf before the distributed context is created
- * // do the work here
- * stop
- * }
- *
- * }}}
- */
+/**
+ * Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ * // define only the options specific to this driver, inherit the generic ones
+ * private final val SomeOptions = HashMap[String, Any](
+ * "maxThings" -> 500,
+ * "minThings" -> 100,
+ * "appName" -> "SomeDriver")
+ *
+ * override def main(args: Array[String]): Unit = {
+ *
+ * val parser = new MahoutOptionParser(programName = "shortname") {
+ * head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ * // Input output options, non-driver specific
+ * parseIOOptions()
+ *
+ * // Algorithm specific options
+ * // Add in the new options
+ * opts = opts ++ SomeOptions
+ * note("\nAlgorithm control options:")
+ * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ * options + ("maxThings" -> x) ...
+ * }
+ * parser.parse(args, parser.opts) map { opts =>
+ * parser.opts = opts
+ * process()
+ * }
+ * }
+ *
+ * override def process: Unit = {
+ * start() // override to change the default Kryo or SparkConf before the distributed context is created
+ * // do the work here
+ * stop()
+ * }
+ *
+ * }}}
+ */
abstract class MahoutSparkDriver extends MahoutDriver {
- implicit protected var sparkConf = new SparkConf()
+ implicit var sparkConf = new SparkConf()
- /** Creates a Spark context to run the job inside.
- * Override to set the SparkConf values specific to the job,
- * these must be set before the context is created.
- * */
- protected def start() : Unit = {
+ /**
+ * Creates a Spark context to run the job inside.
+ * Override to set the SparkConf values specific to the job,
+ * these must be set before the context is created.
+ */
+ override protected def start() : Unit = {
if (!_useExistingContext) {
+ /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work
+ sparkConf.set("spark.files.userClassPathFirst", "true")
+ sparkConf.set("spark.executor.userClassPathFirst", "true")
+ */
+
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
//else leave as set in Spark config
- mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+ mc = mahoutSparkContext(
+ masterUrl = parser.opts("master").asInstanceOf[String],
appName = parser.opts("appName").asInstanceOf[String],
sparkConf = sparkConf)
}
}
+ /**
+ * Call this before start to use an existing context as when running multiple drivers from a scalatest suite.
+ * @param context An already set up context to run against
+ */
def useContext(context: DistributedContext): Unit = {
_useExistingContext = true
mc = context
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index a46d2ee..b3a1ec2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -18,26 +18,30 @@ package org.apache.mahout.drivers
import org.apache.spark.SparkConf
+/** Adds parsing of Spark specific options to the option parser */
class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
- def parseSparkOptions(implicit sparkConf: SparkConf) = {
+ def parseSparkOptions()(implicit sparkConf: SparkConf) = {
opts = opts ++ MahoutOptionParser.SparkOptions
opts = opts + ("appName" -> programName)
note("\nSpark config options:")
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options + ("master" -> x)
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
+ "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
}
- opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
- options + ("sparkExecutorMem" -> x)
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " +
+ "node (optional). Default: as Spark config specifies") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
}
opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
sparkConf.set(k, v)
} validate { x =>
if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
- } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+ } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before " +
+ "creating this job's Spark context (optional)")
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 8c1bce4..3b47452 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -26,12 +26,11 @@ import scala.collection.immutable.HashMap
/**
* Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
* Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * with domain specific IDS of the form
- * (row id, column id: strength, ...). The IDs will be preserved in the
+ * with domain specific IDS of the form (row id, column id: strength, ...). The IDs will be preserved in the
* output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
- * will be used to calculate row-wise similarity using log-likelihood
- * The options allow control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
+ * will be used to calculate row-wise similarity using log-likelihood. The options allow control of the input
+ * schema, file discovery, output schema, and control of algorithm parameters.
+ *
* To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
* values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
* and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
@@ -49,6 +48,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
private var readWriteSchema: Schema = _
/**
+ * Entry point, not using Scala App trait
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
@@ -67,35 +67,34 @@ object RowSimilarityDriver extends MahoutSparkDriver {
options + ("maxObservations" -> x)
} text ("Max number of observations to consider per row (optional). Default: " +
RowSimilarityOptions("maxObservations")) validate { x =>
- if (x > 0) success else failure("Option --maxObservations must be > 0")
+ if (x > 0) success else failure("Option --maxObservations must be > 0")
}
opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
options + ("maxSimilaritiesPerRow" -> x)
} text ("Limit the number of similarities per item to this number (optional). Default: " +
RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
- if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+ if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
}
- /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
- * todo: replacing the threshold with some % of the best values and/or a
- * confidence measure expressed in standard deviations would be nice.
- */
+ // --threshold not implemented in SimilarityAnalysis.rowSimilarity
+ // todo: replacing the threshold with some % of the best values and/or a
+ // confidence measure expressed in standard deviations would be nice.
//Driver notes--driver specific
note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
//Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ parseIndexedDatasetFormatOptions()
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
@@ -106,9 +105,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
}
- override protected def start() : Unit = {
+ override protected def start(): Unit = {
- super.start
+ super.start()
readWriteSchema = new Schema(
"rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -120,8 +119,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
private def readIndexedDataset: IndexedDataset = {
- val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
- parser.opts("recursive").asInstanceOf[Boolean]).uris
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+ parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
null.asInstanceOf[IndexedDataset]
@@ -132,8 +131,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
}
- override def process: Unit = {
- start
+ override def process(): Unit = {
+ start()
val indexedDataset = readIndexedDataset
@@ -144,7 +143,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
- stop
+ stop()
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 368ee89..8531a0a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -58,56 +58,56 @@ object TestNBDriver extends MahoutSparkDriver {
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
- //Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ //IndexedDataset output schema--not driver specific, IndexedDataset specific
+ parseIndexedDatasetFormatOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
}
parser.parse(args, parser.opts) map { opts =>
parser.opts = opts
- process
+ process()
}
}
-/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-private def readTestSet: DrmLike[_] = {
- val inputPath = parser.opts("input").asInstanceOf[String]
- val trainingSet= drm.drmDfsRead(inputPath)
- trainingSet
-}
+ /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+ private def readTestSet: DrmLike[_] = {
+ val inputPath = parser.opts("input").asInstanceOf[String]
+ val trainingSet = drm.drmDfsRead(inputPath)
+ trainingSet
+ }
-/** read the model from pathToModel using NBModel.DfsRead(...) */
-private def readModel: NBModel = {
- val inputPath = parser.opts("pathToModel").asInstanceOf[String]
- val model= NBModel.dfsRead(inputPath)
- model
-}
+ /** read the model from pathToModel using NBModel.DfsRead(...) */
+ private def readModel: NBModel = {
+ val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+ val model = NBModel.dfsRead(inputPath)
+ model
+ }
-override def process: Unit = {
- start()
+ override def process(): Unit = {
+ start()
- val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
- val outputPath = parser.opts("output").asInstanceOf[String]
+ val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+ val outputPath = parser.opts("output").asInstanceOf[String]
- // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
+ // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
- val testSet = readTestSet
- val model = readModel
- val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+ val testSet = readTestSet
+ val model = readModel
+ val analyzer = NaiveBayes.test(model, testSet, testComplementary)
- println(analyzer)
+ println(analyzer)
- stop
-}
+ stop()
+ }
}