You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/03/04 22:52:27 UTC
[1/7] mahout git commit: simplified driver and made required changes
to all, note: left job assembly untouched
Repository: mahout
Updated Branches:
refs/heads/master fde08a9a5 -> b0ee8e265
simplified driver and made required changes to all, note: left job assembly untouched
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d0f64205
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d0f64205
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d0f64205
Branch: refs/heads/master
Commit: d0f64205a116853aa471dd1361a635167da15fcc
Parents: 0f037cb
Author: pferrel <pa...@occamsmachete.com>
Authored: Sat Dec 27 15:43:41 2014 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Sat Dec 27 15:43:41 2014 -0800
----------------------------------------------------------------------
.../apache/mahout/drivers/MahoutDriver.scala | 2 +-
spark/pom.xml | 13 ++--
spark/src/main/assembly/job.xml | 17 +++++-
.../mahout/drivers/ItemSimilarityDriver.scala | 12 +---
.../mahout/drivers/MahoutSparkDriver.scala | 20 +++---
.../mahout/drivers/RowSimilarityDriver.scala | 8 +--
.../apache/mahout/drivers/TestNBDriver.scala | 64 +++++++-------------
.../apache/mahout/drivers/TrainNBDriver.scala | 18 ------
8 files changed, 61 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 8c1f8cf..3d9d4e1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -25,7 +25,7 @@ abstract class MahoutDriver {
implicit protected var mc: DistributedContext = _
- protected var parser: MahoutOptionParser = _
+ implicit protected var parser: MahoutOptionParser = _
var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index f61f988..bcf9e30 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -157,8 +157,8 @@
</executions>
</plugin>
- <!-- create job jar to include CLI driver deps-->
- <!-- leave this in even though there are no hadoop mapreduce jobs in this module -->
+ <!-- create an all dependencies job.jar -->
+ <!-- todo: before release we need a better way to do this MAHOUT-1636 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
@@ -171,13 +171,14 @@
</goals>
<configuration>
<descriptors>
- <descriptor>src/main/assembly/job.xml</descriptor>
+ <descriptor>../spark/src/main/assembly/job.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
+
</plugins>
</build>
<!--
@@ -319,12 +320,6 @@
<!-- 3rd-party -->
- <dependency>
- <groupId>com.github.scopt</groupId>
- <artifactId>scopt_2.10</artifactId>
- <version>3.2.0</version>
- </dependency>
-
<!-- scala stuff -->
<dependency>
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
index 0c41f3d..2bdb3ce 100644
--- a/spark/src/main/assembly/job.xml
+++ b/spark/src/main/assembly/job.xml
@@ -42,5 +42,20 @@
</excludes>
</dependencySet>
</dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/classes</directory>
+ <outputDirectory>/</outputDirectory>
+ <excludes>
+ <exclude>*.jar</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/classes</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>driver.classes.default.props</include>
+ </includes>
+ </fileSet>
+ </fileSets>
</assembly>
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 01a18c9..36ba6ef 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -117,15 +117,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
}
}
- override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
- appName: String = parser.opts("appName").asInstanceOf[String]):
- Unit = {
+ override protected def start() : Unit = {
- if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
- sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
- //else leave as set in Spark config
-
- super.start(masterUrl, appName)
+ super.start
readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
"filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -208,7 +202,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
}
override def process: Unit = {
- start()
+ start
val indexedDatasets = readIndexedDatasets
val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index e6299fd..ab40c3a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -34,7 +34,6 @@ import org.apache.mahout.sparkbindings._
*
* override def main(args: Array[String]): Unit = {
*
- *
* val parser = new MahoutOptionParser(programName = "shortname") {
* head("somedriver", "Mahout 1.0-SNAPSHOT")
*
@@ -55,7 +54,7 @@ import org.apache.mahout.sparkbindings._
* }
*
* override def process: Unit = {
- * start()
+ * start // override to change the default Kryo or SparkConf before the distributed context is created
* // do the work here
* stop
* }
@@ -70,15 +69,18 @@ abstract class MahoutSparkDriver extends MahoutDriver {
/** Creates a Spark context to run the job inside.
* Override to set the SparkConf values specific to the job,
* these must be set before the context is created.
- * @param masterUrl Spark master URL
- * @param appName Name to display in Spark UI
* */
- protected def start(masterUrl: String, appName: String) : Unit = {
- sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
-
+ protected def start() : Unit = {
if (!_useExistingContext) {
- mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+ sparkConf.set("spark.kryo.referenceTracking", "false")
+ .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
+
+ if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+ sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ //else leave as set in Spark config
+ mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+ appName = parser.opts("appName").asInstanceOf[String],
+ sparkConf = sparkConf)
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 9b44b95..8c1bce4 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -106,11 +106,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
}
- override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
- appName: String = parser.opts("appName").asInstanceOf[String]):
- Unit = {
+ override protected def start() : Unit = {
- super.start(masterUrl, appName)
+ super.start
readWriteSchema = new Schema(
"rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -135,7 +133,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
override def process: Unit = {
- start()
+ start
val indexedDataset = readIndexedDataset
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 7d0738c..368ee89 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -78,54 +78,36 @@ object TestNBDriver extends MahoutSparkDriver {
}
}
- override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
- appName: String = parser.opts("appName").asInstanceOf[String]):
- Unit = {
-
- // will be only specific to this job.
- // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default
-
- if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
- sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-
- // Note: set a large akka frame size for DSL NB (20)
- //sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes..
- //else leave as set in Spark config
-
- super.start(masterUrl, appName)
-
- }
-
- /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
- private def readTestSet: DrmLike[_] = {
- val inputPath = parser.opts("input").asInstanceOf[String]
- val trainingSet= drm.drmDfsRead(inputPath)
- trainingSet
- }
+/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+private def readTestSet: DrmLike[_] = {
+ val inputPath = parser.opts("input").asInstanceOf[String]
+ val trainingSet= drm.drmDfsRead(inputPath)
+ trainingSet
+}
- /** read the model from pathToModel using NBModel.DfsRead(...) */
- private def readModel: NBModel = {
- val inputPath = parser.opts("pathToModel").asInstanceOf[String]
- val model= NBModel.dfsRead(inputPath)
- model
- }
+/** read the model from pathToModel using NBModel.DfsRead(...) */
+private def readModel: NBModel = {
+ val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+ val model= NBModel.dfsRead(inputPath)
+ model
+}
- override def process: Unit = {
- start()
+override def process: Unit = {
+ start()
- val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
- val outputPath = parser.opts("output").asInstanceOf[String]
+ val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+ val outputPath = parser.opts("output").asInstanceOf[String]
- // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
+ // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
- val testSet = readTestSet
- val model = readModel
- val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+ val testSet = readTestSet
+ val model = readModel
+ val analyzer= NaiveBayes.test(model, testSet, testComplementary)
- println(analyzer)
+ println(analyzer)
- stop
- }
+ stop
+}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 35ff90b..3d03c1d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -72,24 +72,6 @@ object TrainNBDriver extends MahoutSparkDriver {
}
}
- override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
- appName: String = parser.opts("appName").asInstanceOf[String]):
- Unit = {
-
- // will be only specific to this job.
- // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default
-
- if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
- sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-
- // Note: set a large akka frame size for DSL NB (20)
- // sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes..
- // else leave as set in Spark config
-
- super.start(masterUrl, appName)
-
- }
-
/** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
private def readTrainingSet: DrmLike[_]= {
val inputPath = parser.opts("input").asInstanceOf[String]
[5/7] mahout git commit: removed o.a.m.Pair,
cleaned up comments and style issues, simplified driver API,
merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in
v1.2.1
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 895bd01..a90e672 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -26,18 +26,21 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
import org.apache.mahout.sparkbindings._
import scala.collection.JavaConversions._
-/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]]
- */
+/**
+ * Extends Reader trait to supply the [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] as
+ * the type read and a element and row reader functions for reading text delimited files as described in the
+ * [[org.apache.mahout.math.indexeddataset.Schema]]
+ */
trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
- /** Read in text delimited elements from all URIs in the comma delimited source String and return
- * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
- * no strength value in the element, assume it's presence means a strength of 1.
- *
- * @param mc context for the Spark job
- * @param readSchema describes the delimiters and positions of values in the text delimited file.
- * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
- * @return
- */
+ /**
+ * Read in text delimited elements from all URIs in the comma delimited source String and return
+ * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+ * no strength value in the element, assume it's presence means a strength of 1.
+ * @param mc context for the Spark job
+ * @param readSchema describes the delimiters and positions of values in the text delimited file.
+ * @param source comma delimited URIs of text files to be read from
+ * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+ */
protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
@@ -116,16 +119,16 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
}
}
- /** Read in text delimited rows from all URIs in this comma delimited source String and return
- * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
- * no strength value in the element, assume it's presence means a strength of 1.
- *
- * @param mc context for the Spark job
- * @param readSchema describes the delimiters and positions of values in the text delimited file.
- * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
- * @return
- */
- protected def drmReader(
+ /**
+ * Read in text delimited rows from all URIs in this comma delimited source String and return
+ * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+ * no strength value in the element, assume it's presence means a strength of 1.
+ * @param mc context for the Spark job
+ * @param readSchema describes the delimiters and positions of values in the text delimited file.
+ * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
+ * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+ */
+ protected def rowReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
@@ -205,33 +208,36 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
}
}
- // this creates a BiMap from an ID collection. The ID points to an ordinal int
- // which is used internal to Mahout as the row or column ID
- // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
- // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
- // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
- private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+ /**
+ * Creates a BiMap from an ID collection. The ID points to an ordinal in which is used internal to Mahout
+ * as the row or column ID
+ * todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
+ * non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
+ * in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
+ */
+ private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(),
+ entries: Array[String]):
+ BiMap[String, Int] = {
var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
for (entry <- entries) {
if (!dictionary.contains(entry)){
dictionary.put(entry, index)
index += 1
- }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason they do
+ }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason
+ // they do
}
dictionary
}
}
+/** Extends the Writer trait to supply the type being written and supplies the writer function */
trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
-
- private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
- /** Read in text delimited elements from all URIs in this comma delimited source String.
- *
- * @param mc context for the Spark job
- * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
- * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
- */
+ /**
+ * Read in text delimited elements from all URIs in this comma delimited source String.
+ * @param mc context for the Spark job
+ * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
+ * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
+ */
protected def writer(
mc: DistributedContext,
writeSchema: Schema,
@@ -262,21 +268,20 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
matrix.rdd.map { case (rowID, itemVector) =>
// turn non-zeros into list for sorting
- val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]]
+ var itemList = List[(Int, Double)]()
for (ve <- itemVector.nonZeroes) {
- val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
- itemList += item
+ itemList = itemList :+ (ve.index, ve.get)
}
//sort by highest value descending(-)
- val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+ val vector = if (sort) itemList.sortBy { elem => -elem._2 } else itemList
// first get the external rowID token
if (!vector.isEmpty){
var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
// for the rest of the row, construct the vector contents of elements (external column ID, strength value)
for (item <- vector) {
- line += columnIDDictionary.inverse.get(item.getFirst)
- if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+ line += columnIDDictionary.inverse.get(item._1)
+ if (!omitScore) line += columnIdStrengthDelim + item._2
line += elementDelim
}
// drop the last delimiter, not needed to end the line
@@ -296,26 +301,33 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
/** A combined trait that reads and writes */
trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter
-/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
- * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
- * @param mc Spark context for reading files
- * @note The source is supplied to Reader#readElementsFrom .
- * */
+/**
+ * Reads text delimited files into an IndexedDataset. Classes can be used to supply trait params in their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
+ * @param mc Spark context for reading files
+ * @note The source is supplied to Reader#readElementsFrom .
+ */
class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
-/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
- * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
- * @param mc Spark context for reading files
- * @note the destination is supplied to Writer#writeTo
- * */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
-
-/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
- * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
- * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
- * @param mc Spark context for reading the files, may be implicitly defined.
- * */
+/**
+ * Writes text delimited files into an IndexedDataset. Classes can be used to supply trait params in their
+ * constructor.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading files
+ * @note the destination is supplied to Writer#writeTo
+ */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)
+ (implicit val mc: DistributedContext)
+ extends TDIndexedDatasetWriter
+
+/**
+ * Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in
+ * their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading the files, may be implicitly defined.
+ */
class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true)
(implicit val mc: DistributedContext)
extends TDIndexedDatasetReaderWriter
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 3d03c1d..4f88c13 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -52,23 +52,23 @@ object TrainNBDriver extends MahoutSparkDriver {
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
- //Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ //IndexedDataset output schema--not driver specific, IndexedDataset specific
+ parseIndexedDatasetFormatOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
}
parser.parse(args, parser.opts) map { opts =>
parser.opts = opts
- process
+ process()
}
}
@@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver {
trainingSet
}
- override def process: Unit = {
+ override def process(): Unit = {
start()
val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean]
@@ -91,7 +91,7 @@ object TrainNBDriver extends MahoutSparkDriver {
model.dfsWrite(outputPath)
- stop
+ stop()
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index c0d36c6..47eb40b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -252,10 +252,10 @@ object SparkEngine extends DistributedEngine {
}
/**
- * reads an IndexedDatasetSpark from default text delimited files
+ * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+ * delimited files. Reads a vector per row.
* @param src a comma separated list of URIs to read from
* @param schema how the text file is formatted
- * @return
*/
def indexedDatasetDFSRead(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
@@ -263,15 +263,15 @@ object SparkEngine extends DistributedEngine {
(implicit sc: DistributedContext):
IndexedDatasetSpark = {
val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
- val ids = reader.readDRMFrom(src, existingRowIDs)
+ val ids = reader.readRowsFrom(src, existingRowIDs)
ids
}
/**
- * reads an IndexedDatasetSpark from default text delimited files
+ * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+ * delimited files. Reads an element per row.
* @param src a comma separated list of URIs to read from
* @param schema how the text file is formatted
- * @return
*/
def indexedDatasetDFSReadElements(src: String,
schema: Schema = DefaultIndexedDatasetElementReadSchema,
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index d3aa0a8..30b32ad 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -26,8 +26,12 @@ import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema,
/**
* Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
* dfsWrite method
+ * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+ * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
+ * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
*/
-class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int],
+ val columnIDs: BiMap[String,Int])
extends IndexedDataset {
/** Secondary constructor enabling immutability */
@@ -35,14 +39,19 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[St
this(id2.matrix, id2.rowIDs, id2.columnIDs)
}
- /** Factory method used to create this extending class when the interface of
- * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */
+ /**
+ * Factory method used to create this extending class when the interface of
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known.
+ */
override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
IndexedDatasetSpark = {
new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
}
- /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+ /**
+ * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+ * replace the writer to change how it is written.
+ */
override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
(implicit sc: DistributedContext):
Unit = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c441716..8199708 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -223,7 +223,11 @@ package object sparkbindings {
// During maven tests, the maven classpath also creeps in for some reason
!n.matches(".*/.m2/.*")
)
-
+ /* verify jar passed to context
+ log.info("\n\n\n")
+ mcjars.foreach(j => log.info(j))
+ log.info("\n\n\n")
+ */
mcjars
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 76c3553..ea6b40f 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -63,7 +63,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"iphone\tipad:1.7260924347106847",
"surface")
- val CrossIndicatorLines = Iterable(
+ val CrossSimilarityLines = Iterable(
"iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
"ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
"nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
@@ -78,49 +78,45 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"iphone\tipad:1.7260924347106847",
"surface"))
- val CrossIndicatorTokens = tokenize(Iterable(
+ val CrossSimilarityTokens = tokenize(Iterable(
"iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
"ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
"nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
"galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
"surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
- // now in MahoutSuite
- // final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
-
/*
//Clustered Spark and HDFS, not a good everyday build test
ItemSimilarityDriver.main(Array(
"--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
- "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+ "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/similarityMatrices/",
"--master", "spark://occam4:7077",
"--filter1", "purchase",
"--filter2", "view",
"--inDelim", ",",
"--itemIDColumn", "2",
"--rowIDColumn", "0",
- "--filterColumn", "1"
- ))
-*/
+ "--filterColumn", "1"))
+ */
// local multi-threaded Spark with HDFS using large dataset
// not a good build test.
- /* ItemSimilarityDriver.main(Array(
+ /*
+ ItemSimilarityDriver.main(Array(
"--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
- "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+ "--output", "hdfs://occam4:54310/user/pat/xrsj/similarityMatrices/",
"--master", "local[4]",
"--filter1", "purchase",
"--filter2", "view",
"--inDelim", ",",
"--itemIDColumn", "2",
"--rowIDColumn", "0",
- "--filterColumn", "1"
- ))
+ "--filterColumn", "1"))
*/
test("ItemSimilarityDriver, non-full-spec CSV") {
val InFile = TmpDir + "in-file.csv/" //using part files, not single file
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1,purchase,iphone",
@@ -163,10 +159,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
// todo: these comparisons rely on a sort producing the same lines, which could possibly
// fail since the sort is on value and these can be the same for all items in a vector
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
}
@@ -174,7 +170,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
test("ItemSimilarityDriver TSV ") {
val InFile = TmpDir + "in-file.tsv/"
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1\tpurchase\tiphone",
@@ -216,17 +212,17 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
// todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
// some error cases
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
}
test("ItemSimilarityDriver log-ish files") {
val InFile = TmpDir + "in-file.log/"
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
@@ -267,10 +263,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--filterColumn", "2"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
}
@@ -280,7 +276,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
val InFilename = "in-file.tsv"
val InPath = InDir + InFilename
- val OutPath = TmpDir + "indicator-matrices"
+ val OutPath = TmpDir + "similarity-matrices"
val lines = Array(
"0,0,1",
@@ -312,8 +308,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--output", OutPath,
"--master", masterUrl))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs Answer
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs Answer
}
@@ -323,7 +319,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
val InFilename = "in-file.tsv"
val InPath = InDir + InFilename
- val OutPath = TmpDir + "indicator-matrices"
+ val OutPath = TmpDir + "similarity-matrices"
val lines = Array(
"0,0,1",
@@ -356,8 +352,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--master", masterUrl,
"--omitStrength"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs Answer
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs Answer
}
@@ -397,7 +393,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
val InPathM2 = InDirM2 + InFilenameM2
val InPathStart = TmpDir + "data/"
- val OutPath = TmpDir + "indicator-matrices"
+ val OutPath = TmpDir + "similarity-matrices"
// this creates one part-0000 file in the directory
mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle = true).saveAsTextFile(InDirM1)
@@ -429,10 +425,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--filenamePattern", "m..tsv",
"--recursive"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
}
@@ -440,7 +436,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1,purchase,iphone",
@@ -482,10 +478,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--rowIDColumn", "0",
"--filterColumn", "1"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
}
@@ -493,7 +489,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1,purchase,iphone",
@@ -549,10 +545,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--rowIDColumn", "0",
"--filterColumn", "1"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
- tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
+ tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
}
@@ -566,7 +562,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
*/
val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1,purchase,iphone",
@@ -590,7 +586,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
"surface\tmobile_acc:0.6795961471815897",
"nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
- "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
+ "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 " +
+ "mobile_acc:1.7260924347106847",
"ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
// this will create multiple part-xxxxx files in the InFile dir but other tests will
@@ -612,10 +609,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
"--filterColumn", "1",
"--writeAllDatasets"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
}
@@ -673,7 +670,7 @@ removed ==> u3 0 0 1 0
*/
val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
- val OutPath = TmpDir + "indicator-matrices/"
+ val OutPath = TmpDir + "similarity-matrices/"
val lines = Array(
"u1,purchase,iphone",
@@ -719,10 +716,10 @@ removed ==> u3 0 0 1 0
"--filterColumn", "1",
"--writeAllDatasets"))
- val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
- val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
- tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
- tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+ val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+ val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+ tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+ tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
}
// convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index 29c8bea..f18ec70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -57,6 +57,10 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
// initContext()
}
+ override protected def afterAll(configMap: ConfigMap): Unit = {
+ super.afterAll(configMap)
+ resetContext()
+ }
override protected def beforeAll(configMap: ConfigMap): Unit = {
super.beforeAll(configMap)
[2/7] mahout git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/mahout
Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e005b28b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e005b28b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e005b28b
Branch: refs/heads/master
Commit: e005b28b263ce5eeab6f105ead660a3fe89cb756
Parents: d0f6420 1ca1ef3
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Feb 12 10:22:38 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Feb 12 10:22:38 2015 -0800
----------------------------------------------------------------------
.../main/java/org/apache/mahout/h2obindings/H2OHelper.java | 9 ++++-----
pom.xml | 2 +-
2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[6/7] mahout git commit: removed o.a.m.Pair,
cleaned up comments and style issues, simplified driver API,
merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in
v1.2.1
Posted by pa...@apache.org.
removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/15ee1951
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/15ee1951
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/15ee1951
Branch: refs/heads/master
Commit: 15ee1951d1ba8d8ee7d24e2510af187fd984c8cf
Parents: 43bea68
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:38:05 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:38:05 2015 -0800
----------------------------------------------------------------------
examples/bin/get-all-examples.sh | 4 +-
math-scala/pom.xml | 25 +++-
.../apache/mahout/drivers/MahoutDriver.scala | 11 +-
.../mahout/drivers/MahoutOptionParser.scala | 78 ++++++-----
.../mahout/math/cf/SimilarityAnalysis.scala | 104 +++++++--------
.../org/apache/mahout/math/drm/package.scala | 4 +-
.../math/indexeddataset/IndexedDataset.scala | 23 ++--
.../math/indexeddataset/ReaderWriter.scala | 71 ++++++++--
.../mahout/math/indexeddataset/Schema.scala | 82 ++++++------
spark/pom.xml | 6 +-
spark/src/main/assembly/dependency-reduced.xml | 44 +++++++
spark/src/main/assembly/job.xml | 61 ---------
.../apache/mahout/common/HDFSPathSearch.scala | 17 ++-
.../mahout/drivers/ItemSimilarityDriver.scala | 78 ++++++-----
.../mahout/drivers/MahoutSparkDriver.scala | 106 ++++++++-------
.../drivers/MahoutSparkOptionParser.scala | 16 ++-
.../mahout/drivers/RowSimilarityDriver.scala | 43 +++---
.../apache/mahout/drivers/TestNBDriver.scala | 58 ++++----
.../drivers/TextDelimitedReaderWriter.scala | 132 ++++++++++---------
.../apache/mahout/drivers/TrainNBDriver.scala | 16 +--
.../mahout/sparkbindings/SparkEngine.scala | 10 +-
.../indexeddataset/IndexedDatasetSpark.scala | 17 ++-
.../apache/mahout/sparkbindings/package.scala | 6 +-
.../drivers/ItemSimilarityDriverSuite.scala | 117 ++++++++--------
.../test/DistributedSparkSuite.scala | 4 +
25 files changed, 619 insertions(+), 514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/examples/bin/get-all-examples.sh
----------------------------------------------------------------------
diff --git a/examples/bin/get-all-examples.sh b/examples/bin/get-all-examples.sh
index a24c7fd..4128e47 100755
--- a/examples/bin/get-all-examples.sh
+++ b/examples/bin/get-all-examples.sh
@@ -26,8 +26,8 @@ echo " Solr-recommender example: "
echo " 1) imports text 'log files' of some delimited form for user preferences"
echo " 2) creates the correct Mahout files and stores distionaries to translate external Id to and from Mahout Ids"
echo " 3) it implements a prototype two actions 'cross-recommender', which takes two actions made by the same user and creates recommendations"
-echo " 4) it creates output for user->preference history CSV and and item->similar items 'indicator' matrix for use in a Solr-recommender."
-echo " To use Solr you would index the indicator matrix CSV, and use user preference history from the history CSV as a query, the result"
+echo " 4) it creates output for user->preference history CSV and and item->similar items 'similarity' matrix for use in a Solr-recommender."
+echo " To use Solr you would index the similarity matrix CSV, and use user preference history from the history CSV as a query, the result"
echo " from Solr will be an ordered list of recommendations returning the same item Ids as were input."
echo " For further description see the README.md here https://github.com/pferrel/solr-recommender"
echo " To build run 'cd solr-recommender; mvn install'"
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 66309d6..50cea7a 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -175,17 +175,36 @@
<dependency>
<groupId>com.github.scopt</groupId>
- <artifactId>scopt_2.10</artifactId>
- <version>3.2.0</version>
+ <artifactId>scopt_${scala.major}</artifactId>
+ <version>3.3.0</version>
</dependency>
<!-- scala stuff -->
<dependency>
<groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-actors</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.major}</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 3d9d4e1..32515f1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -19,23 +19,24 @@ package org.apache.mahout.drivers
import org.apache.mahout.math.drm.DistributedContext
-/** Extended by a platform specific version of this class to create a Mahout CLI driver.
- */
+/** Extended by a platform specific version of this class to create a Mahout CLI driver. */
abstract class MahoutDriver {
-
implicit protected var mc: DistributedContext = _
implicit protected var parser: MahoutOptionParser = _
var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+ /** must be overriden to setup the DistributedContext mc*/
+ protected def start() : Unit
+
/** Override (optionally) for special cleanup */
- protected def stop: Unit = {
+ protected def stop(): Unit = {
if (!_useExistingContext) mc.close
}
/** This is where you do the work, call start first, then before exiting call stop */
- protected def process: Unit
+ protected def process(): Unit
/** Parse command line and call process */
def main(args: Array[String]): Unit
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 479a8d0..3b5affd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -20,16 +20,17 @@ import scopt.OptionParser
import scala.collection.immutable
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
- * keep both standarized.
- * @param programName Name displayed in help message, the name by which the driver is invoked.
- * @note options are engine neutral by convention. See the engine specific extending class for
- * to add Spark or other engine options.
- * */
+/**
+ * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ * to add Spark or other engine options.
+ */
class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
// build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so they can override and previous options!
+ // Note: always put the driver specific options at the last so they can override any previous options!
var opts = Map.empty[String, Any]
override def showUsageOnError = true
@@ -39,12 +40,14 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
note("Input, output options")
opt[String]('i', "input") required() action { (x, options) =>
options + ("input" -> x)
- } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" +
+ " (required)")
if (numInputs == 2) {
opt[String]("input2") abbr ("i2") action { (x, options) =>
options + ("input2" -> x)
- } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " +
+ "(optional). Default: empty.")
}
opt[String]('o', "output") required() action { (x, options) =>
@@ -53,11 +56,11 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
} else {
options + ("output" -> (x + "/"))
}
- } text ("Path for output, any local or HDFS supported URI (required)")
+ } text ("Path for output directory, any HDFS supported URI (required)")
}
- def parseGenericOptions = {
+ def parseGenericOptions() = {
opts = opts ++ MahoutOptionParser.GenericOptions
opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
options + ("randomSeed" -> x)
@@ -65,48 +68,55 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
if (x > 0) success else failure("Option --randomSeed must be > 0")
}
- //output both input DRMs
+ //output both input IndexedDatasets
opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
options + ("writeAllDatasets" -> true)
}//Hidden option, though a user might want this.
}
- def parseElementInputSchemaOptions{
+ def parseElementInputSchemaOptions() = {
//Input text file schema--not driver specific but input data specific, elements input,
- // not drms
+ // not rows of IndexedDatasets
opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
- options + ("inDelim" -> x)
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action {
+ (x, options) =>
+ options + ("inDelim" -> x)
}
opt[String]("filter1") abbr ("f1") action { (x, options) =>
options + ("filter1" -> x)
- } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " +
+ "Default: no filter, all data is used")
opt[String]("filter2") abbr ("f2") action { (x, options) =>
options + ("filter2" -> x)
- } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " +
+ "If not present no secondary dataset is collected")
opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
options + ("rowIDColumn" -> x)
- } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
- if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate {
+ x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
}
opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
options + ("itemIDColumn" -> x)
- } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
- if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate {
+ x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
}
opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
options + ("filterColumn" -> x)
- } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " +
+ "filter") validate { x =>
if (x >= -1) success else failure("Option --filterColNum must be >= -1")
}
- note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" +
+ " \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
//check for column consistency
checkConfig { options: Map[String, Any] =>
@@ -126,7 +136,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
- def parseFileDiscoveryOptions = {
+ def parseFileDiscoveryOptions() = {
//File finding strategy--not driver specific
opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
note("\nFile discovery options:")
@@ -136,12 +146,13 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
options + ("filenamePattern" -> x)
- } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option " +
+ "or \"^part-.*\" if --input is a directory")
}
- def parseDrmFormatOptions = {
- opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+ def parseIndexedDatasetFormatOptions() = {
+ opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
note("\nOutput text file schema options:")
opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
options + ("rowKeyDelim" -> x)
@@ -160,13 +171,16 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
} text ("Do not write the strength to the output files (optional), Default: false.")
note("This option is used to output indexable data for creating a search engine recommender.")
- note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+ note("\nDefault delimiters will produce output of the form: " +
+ "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
}
}
-/** Companion object defines default option groups for reference in any driver that needs them.
- * @note not all options are platform neutral so other platforms can add default options here if desired */
+/**
+ * Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired
+ */
object MahoutOptionParser {
// set up the various default option groups
@@ -196,7 +210,7 @@ object MahoutOptionParser {
"filter2" -> null.asInstanceOf[String],
"inDelim" -> "[,\t ]")
- final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+ final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any](
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index e1766e8..6557ab0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -32,7 +32,7 @@ import scala.util.Random
/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
* available at http://www.mapr.com/practical-machine-learning
*
* see also "Sebastian Schelter, Christoph Boden, Volker Markl:
@@ -44,14 +44,16 @@ object SimilarityAnalysis extends Serializable {
/** Compares (Int,Double) pairs by the second value */
private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
- /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
- * and returns a list of indicator and cross-indicator matrices
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @return
- */
+ /**
+ * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+ * and returns a list of similarity and cross-similarity matrices
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
+ * cross-cooccurrence
+ */
def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
@@ -69,11 +71,11 @@ object SimilarityAnalysis extends Serializable {
// Compute co-occurrence matrix A'A
val drmAtA = drmA.t %*% drmA
- // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
- val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
+ // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+ val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
+ bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
- var indicatorMatrices = List(drmIndicatorsAtA)
+ var similarityMatrices = List(drmSimilarityAtA)
// Now look at cross-co-occurrences
for (drmBRaw <- drmBs) {
@@ -86,10 +88,10 @@ object SimilarityAnalysis extends Serializable {
// Compute cross-co-occurrence matrix A'B
val drmAtB = drmA.t %*% drmB
- val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+ val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
bcastInteractionsPerItemA, bcastInteractionsPerThingB)
- indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+ similarityMatrices = similarityMatrices :+ drmSimilarityAtB
drmB.uncache()
}
@@ -97,19 +99,21 @@ object SimilarityAnalysis extends Serializable {
// Unpin downsampled interaction matrix
drmA.uncache()
- // Return list of indicator matrices
- indicatorMatrices
+ // Return list of similarity matrices
+ similarityMatrices
}
- /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
- * and returns a list of indicator and cross-indicator matrices
- * Somewhat easier to use method, which handles the ID dictionaries correctly
- * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingItemsPerThing max similarities per items
- * @param maxNumInteractions max number of input items per item
- * @return
- */
+ /**
+ * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
+ * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
+ * dictionaries correctly
+ * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingItemsPerThing max similarities per items
+ * @param maxNumInteractions max number of input items per item
+ * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
+ * IndexedDatasets for cooccurrence and cross-cooccurrence
+ */
def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
randomSeed: Int = 0xdeadbeef,
maxInterestingItemsPerThing: Int = 50,
@@ -127,13 +131,13 @@ object SimilarityAnalysis extends Serializable {
retIDSs.toList
}
- /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @return
- */
+ /**
+ * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
+ * @param drmARaw Primary interaction matrix
+ * @param randomSeed when kept to a constant will make repeatable downsampling
+ * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+ * @param maxNumInteractions max number of interactions after downsampling, default: 500
+ */
def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
maxNumInteractions: Int = 500): DrmLike[Int] = {
@@ -152,20 +156,20 @@ object SimilarityAnalysis extends Serializable {
val drmAAt = drmA %*% drmA.t
// Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
- val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
- bcastInteractionsPerItemA, crossCooccurrence = false)
+ val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
+ bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
drmSimilaritiesAAt
}
- /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
- * Uses IndexedDatasets, which handle external ID dictionaries properly
- * @param indexedDataset compare each row to every other
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingSimilaritiesPerRow max elements returned in each row
- * @param maxObservationsPerRow max number of input elements to use
- * @return
- */
+ /**
+ * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+ * Uses IndexedDatasets, which handle external ID dictionaries properly
+ * @param indexedDataset compare each row to every other
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+ * @param maxObservationsPerRow max number of input elements to use
+ */
def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
maxInterestingSimilaritiesPerRow: Int = 50,
maxObservationsPerRow: Int = 500):
@@ -175,10 +179,7 @@ object SimilarityAnalysis extends Serializable {
indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
}
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- */
+ /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
numInteractionsWithAandB: Long, numInteractions: Long) = {
@@ -220,7 +221,7 @@ object SimilarityAnalysis extends Serializable {
val candidate = thingA -> llr
- // matches legacy hadoop code and maps values to range (0..1)
+ // legacy hadoop code maps values to range (0..1) via
// val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
// val candidate = thingA -> normailizedLLR
@@ -253,7 +254,7 @@ object SimilarityAnalysis extends Serializable {
* @param drmM matrix to downsample
* @param seed random number generator seed, keep to a constant if repeatability is neccessary
* @param maxNumInteractions number of elements in a row of the returned matrix
- * @return
+ * @return the downsampled DRM
*/
def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
@@ -269,9 +270,8 @@ object SimilarityAnalysis extends Serializable {
case (keys, block) =>
val numInteractions: Vector = bcastNumInteractions
- // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
- // don't use commons since scala's is included anyway
- // val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+ // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
+ //failures
val random = new Random(MurmurHash.hash(keys(0), seed))
val downsampledBlock = block.like()
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 3afbecb..81f6ab1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -123,13 +123,13 @@ package object indexeddataset {
def indexedDatasetDFSRead(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: BiMap[String, Int] = HashBiMap.create())
- (implicit ctx: DistributedContext):
+ (implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
def indexedDatasetDFSReadElements(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: BiMap[String, Int] = HashBiMap.create())
- (implicit ctx: DistributedContext):
+ (implicit ctx: DistributedContext):
IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
index c7eb2cb..f6811e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -22,15 +22,12 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
import org.apache.mahout.math.indexeddataset
/**
- * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
- * ID/label translation dictionaries.
- * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
- * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
- * used internal to Mahout core code.
- *
- * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
- * to be not created when not needed.
- */
+ * Wrap an [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]]
+ * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout
+ * core code.
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created
+ * when not needed.
+ */
trait IndexedDataset {
val matrix: CheckpointedDrm[Int]
@@ -39,12 +36,13 @@ trait IndexedDataset {
/**
* Write a text delimited file(s) with the row and column IDs from dictionaries.
- * @param dest
- * @param schema
+ * @param dest write location, usually a directory
+ * @param schema params to control writing
+ * @param sc the [[org.apache.mahout.math.drm.DistributedContext]] used to do a distributed write
*/
def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
- /** Factory method, creates the extending class */
+ /** Factory method, creates the extending class and returns a new instance */
def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
IndexedDataset
@@ -52,6 +50,7 @@ trait IndexedDataset {
* Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
* No changes are made to the underlying drm.
* @param n number to use for new row cardinality, should be larger than current
+ * @return a new IndexedDataset or extending class with new cardinality
* @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
* results.
*/
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
index cf429f5..f7653ae 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -20,49 +20,98 @@ package org.apache.mahout.math.indexeddataset
import com.google.common.collect.{BiMap, HashBiMap}
import org.apache.mahout.math.drm.DistributedContext
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
- * which also defines the type to be read.
- * @tparam T type of object to read.
- */
+/**
+ * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an
+ * extending trait, which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
trait Reader[T]{
val mc: DistributedContext
val readSchema: Schema
+ /**
+ * Override in extending trait to supply T and perform a parallel read of collection elements
+ * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+ * @param readSchema map of parameters controlling formating and how the read is executed
+ * @param source list of comma delimited files to read from
+ * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+ * been applied to this collection--used to synchronize row IDs between several
+ * collections
+ * @return a new collection of type T
+ */
protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
existingRowIDs: BiMap[String, Int]): T
- protected def drmReader(
+ /**
+ * Override in extending trait to supply T and perform a parallel read of collection rows
+ * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+ * @param readSchema map of parameters controlling formating and how the read is executed
+ * @param source list of comma delimited files to read from
+ * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+ * been applied to this collection--used to synchronize row IDs between several
+ * collections
+ * @return a new collection of type T
+ */
+ protected def rowReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
existingRowIDs: BiMap[String, Int]): T
+ /**
+ * Public method called to perform the element-wise read. Usually no need to override
+ * @param source comma delimited URIs to read from
+ * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+ * to synchronize all row ids is several collections
+ * @return a new collection of type T
+ */
def readElementsFrom(
source: String,
existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
elementReader(mc, readSchema, source, existingRowIDs)
- def readDRMFrom(
+ /**
+ * Public method called to perform the row-wise read. Usually no need to override.
+ * @param source comma delimited URIs to read from
+ * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+ * to synchronize all row ids is several collections
+ * @return a new collection of type T
+ */
+ def readRowsFrom(
source: String,
existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- drmReader(mc, readSchema, source, existingRowIDs)
+ rowReader(mc, readSchema, source, existingRowIDs)
}
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
- * which also defines the type to be written.
- * @tparam T type of object to write.
- */
+/**
+ * Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write, usually a matrix type thing.
+ */
trait Writer[T]{
val mc: DistributedContext
val sort: Boolean
val writeSchema: Schema
+ /**
+ * Override to provide writer method
+ * @param mc context used to do distributed write
+ * @param writeSchema map with params to control format and execution of the write
+ * @param dest root directory to write to
+ * @param collection usually a matrix like collection to write
+ * @param sort flags whether to sort the rows by value descending
+ */
protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+ /**
+ * Call this method to perform the write, usually no need to override.
+ * @param collection what to write
+ * @param dest root directory to write to
+ */
def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
index 557b419..3b4a2e9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -19,34 +19,33 @@ package org.apache.mahout.math.indexeddataset
import scala.collection.mutable.HashMap
-/** Syntactic sugar for mutable.HashMap[String, Any]
- *
- * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
- */
+/**
+ * Syntactic sugar for mutable.HashMap[String, Any]
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
// note: this require a mutable HashMap, do we care?
this ++= params
- /** Constructor for copying an existing Schema
- *
- * @param schemaToClone return a copy of this Schema
- */
+ /**
+ * Constructor for copying an existing Schema
+ * @param schemaToClone return a copy of this Schema
+ */
def this(schemaToClone: Schema){
this()
this ++= schemaToClone
}
}
-/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
- * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * , which can be used to create a Mahout DRM for DSL ops.
- */
-
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+// which can be used to create a Mahout DRM for DSL ops.
-/** Simple default Schema for typical text delimited element file input
- * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
- * <comma, tab, or space>here may be other ignored text...)
- */
+/**
+ * Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
final object DefaultIndexedDatasetElementReadSchema extends Schema(
"delim" -> "[,\t ]", //comma, tab or space
"filter" -> "",
@@ -54,46 +53,49 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema(
"columnIDPosition" -> 1,
"filterColumn" -> -1)
-/** Default Schema for text delimited drm file output
- * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
- * (rowID<tab>columnID1:score1<space>columnID2:score2...)
- */
+/**
+ * Default Schema for text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output with
+ * one row per line.
+ * The default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
final object DefaultIndexedDatasetWriteSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
"omitScore" -> false)
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
- * This tells the reader to input text lines of the form:
- * (rowID<tab>columnID1:score1,columnID2:score2,...)
- */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file
+ * row-wise input. This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
final object DefaultIndexedDatasetReadSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ")
-/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
- * the score of any element is ignored,
- * all non-zeros are replaced with 1.
- * This tells the reader to input DRM lines of the form
- * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
- * Alternatively the format can be
- * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
- * output format for [[IndexedDatasetWriteBooleanSchema]]
- */
+/**
+ * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
+ * the score of any element is ignored.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
final object IndexedDatasetReadBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"elementDelim" -> " ",
"omitScore" -> true)
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
- * the score of a element is omitted.
- * The presence of a element means the score = 1, the absence means a score of 0.
- * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
- * (rowID<tab>columnID1<space>columnID2...)
- */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output
+ * where the score of a element is omitted. This tells the writer to output
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
final object IndexedDatasetWriteBooleanSchema extends Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index bcf9e30..c7069b6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -164,14 +164,14 @@
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
- <id>job</id>
+ <id>dependency-reduced</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
- <descriptor>../spark/src/main/assembly/job.xml</descriptor>
+ <descriptor>src/main/assembly/dependency-reduced.xml</descriptor>
</descriptors>
</configuration>
</execution>
@@ -317,11 +317,9 @@
<scope>test</scope>
</dependency>
-
<!-- 3rd-party -->
<!-- scala stuff -->
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.major}</artifactId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml
new file mode 100644
index 0000000..5dcc945
--- /dev/null
+++ b/spark/src/main/assembly/dependency-reduced.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+ http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>dependency-reduced</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <unpack>true</unpack>
+ <unpackOptions>
+ <!-- MAHOUT-1126 -->
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ </excludes>
+ </unpackOptions>
+ <scope>runtime</scope>
+ <outputDirectory>/</outputDirectory>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <includes>
+ <include>com.google.guava:guava</include>
+ <include>com.github.scopt</include>
+ </includes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
deleted file mode 100644
index 2bdb3ce..0000000
--- a/spark/src/main/assembly/job.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
- http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <id>job</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <unpack>true</unpack>
- <unpackOptions>
- <!-- MAHOUT-1126 -->
- <excludes>
- <exclude>META-INF/LICENSE</exclude>
- </excludes>
- </unpackOptions>
- <scope>runtime</scope>
- <outputDirectory>/</outputDirectory>
- <useTransitiveFiltering>true</useTransitiveFiltering>
- <excludes>
- <exclude>org.apache.hadoop:hadoop-core</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/classes</directory>
- <outputDirectory>/</outputDirectory>
- <excludes>
- <exclude>*.jar</exclude>
- </excludes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/classes</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>driver.classes.default.props</include>
- </includes>
- </fileSet>
- </fileSets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
index 42bf697..0b4130d 100644
--- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
/**
* Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
* in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
* @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
* @param filePattern regex that must match the entire filename to have the file returned
* @param recursive true traverses the filesystem recursively, default = false
@@ -35,8 +34,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
val conf = new Configuration()
val fs = FileSystem.get(conf)
- /** Returns a string of comma delimited URIs matching the filePattern
- * When pattern matching dirs are never returned, only traversed. */
+ /**
+ * Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed.
+ */
def uris: String = {
if (!filePattern.isEmpty){ // have file pattern so
val pathURIs = pathURI.split(",")
@@ -51,8 +52,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
}
}
- /** Find matching files in the dir, recursively call self when another directory is found
- * Only files are matched, directories are traversed but never return a match */
+ /**
+ * Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match
+ */
private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
val seed = fs.getFileStatus(new Path(dir))
var f: String = files
@@ -71,7 +74,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
f = findFiles(fileStatus.getPath.toString, filePattern, f)
}
}
- }else{ f = dir }// was a filename not dir
+ } else { f = dir }// was a filename not dir
f
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 36ba6ef..63da80f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,23 +24,19 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
- * Reads text lines
- * that contain (row id, column id, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
- * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
- * matrices and calculate both the self similarity of the primary matrix and the row-wise
- * similarity of the primary
- * to the secondary. Returns one or two directories of text files formatted as specified in
- * the options.
- * The options allow flexible control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
- * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
- * you can specify only the input and output file and directory--all else will default to the correct values.
- * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the output.
+ * The individual elements will be accumulated into a matrix like
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] and
+ * [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] will be used to calculate row-wise
+ * self-similarity, or when using filters or two inputs, will generate two matrices and calculate both the
+ * self-similarity of the primary matrix and the row-wise similarity of the primary to the secondary. Returns one
+ * or two directories of text files formatted as specified in the options. The options allow flexible control of the
+ * input schema, file discovery, output schema, and control of algorithm parameters. To get help run
+ * {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple elements of text delimited
+ * values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, you can specify
+ * only the input and output file and directory--all else will default to the correct values. Each output line will
+ * contain the Item ID and similar items sorted by LLR strength descending.
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
* option.
@@ -57,6 +53,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
private var readSchema2: Schema = _
/**
+ * Entry point, not using Scala App trait
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
@@ -74,52 +71,51 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
options + ("maxPrefs" -> x)
} text ("Max number of preferences to consider per user (optional). Default: " +
ItemSimilarityOptions("maxPrefs")) validate { x =>
- if (x > 0) success else failure("Option --maxPrefs must be > 0")
+ if (x > 0) success else failure("Option --maxPrefs must be > 0")
}
- /** not implemented in SimilarityAnalysis.cooccurrence
- * threshold, and minPrefs
- * todo: replacing the threshold with some % of the best values and/or a
- * confidence measure expressed in standard deviations would be nice.
- */
+ // not implemented in SimilarityAnalysis.cooccurrence
+ // threshold, and minPrefs
+ // todo: replacing the threshold with some % of the best values and/or a
+ // confidence measure expressed in standard deviations would be nice.
opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
options + ("maxSimilaritiesPerItem" -> x)
} text ("Limit the number of similarities per item to this number (optional). Default: " +
ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
- if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+ if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
}
//Driver notes--driver specific
note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
//Input text format
- parseElementInputSchemaOptions
+ parseElementInputSchemaOptions()
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
//Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ parseIndexedDatasetFormatOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
}
parser.parse(args, parser.opts) map { opts =>
parser.opts = opts
- process
+ process()
}
}
override protected def start() : Unit = {
- super.start
+ super.start()
readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
"filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -139,12 +135,12 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
"columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
"omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
"elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
- }
+ }
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
- parser.opts("recursive").asInstanceOf[Boolean]).uris
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+ parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
@@ -160,8 +156,8 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
// The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
// Here we assume there is one row ID space for all interactions. To do this we calculate the
- // row cardinality only after reading in A and B (or potentially C...) We then adjust the
- // cardinality so all match, which is required for the math to work.
+ // row cardinality only after reading in A and B (or potentially C...) We then adjust the cardinality
+ // so all match, which is required for the math to work.
// Note: this may leave blank rows with no representation in any DRM. Blank rows need to
// be supported (and are at least on Spark) or the row cardinality adjustment will not work.
val datasetB = if (!inFiles2.isEmpty) {
@@ -201,19 +197,19 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
}
}
- override def process: Unit = {
- start
+ override def process(): Unit = {
+ start()
val indexedDatasets = readIndexedDatasets
val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
// todo: allow more than one cross-similarity matrix?
- idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+ idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "similarity-matrix", schema = writeSchema)
if(idss.length > 1)
- idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
+ idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-similarity-matrix", schema = writeSchema)
- stop
+ stop()
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index ab40c3a..668d70c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -21,69 +21,81 @@ import org.apache.mahout.math.drm.DistributedContext
import org.apache.spark.SparkConf
import org.apache.mahout.sparkbindings._
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
- * Also define a Map of options for the command line parser. The following template may help:
- * {{{
- * object SomeDriver extends MahoutDriver {
- *
- * // define only the options specific to this driver, inherit the generic ones
- * private final val SomeOptions = HashMap[String, Any](
- * "maxThings" -> 500,
- * "minThings" -> 100,
- * "appName" -> "SomeDriver")
- *
- * override def main(args: Array[String]): Unit = {
- *
- * val parser = new MahoutOptionParser(programName = "shortname") {
- * head("somedriver", "Mahout 1.0-SNAPSHOT")
- *
- * // Input output options, non-driver specific
- * parseIOOptions
- *
- * // Algorithm specific options
- * // Add in the new options
- * opts = opts ++ SomeOptions
- * note("\nAlgorithm control options:")
- * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
- * options + ("maxThings" -> x) ...
- * }
- * parser.parse(args, parser.opts) map { opts =>
- * parser.opts = opts
- * process
- * }
- * }
- *
- * override def process: Unit = {
- * start // override to change the default Kryo or SparkConf before the distributed context is created
- * // do the work here
- * stop
- * }
- *
- * }}}
- */
+/**
+ * Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ * // define only the options specific to this driver, inherit the generic ones
+ * private final val SomeOptions = HashMap[String, Any](
+ * "maxThings" -> 500,
+ * "minThings" -> 100,
+ * "appName" -> "SomeDriver")
+ *
+ * override def main(args: Array[String]): Unit = {
+ *
+ * val parser = new MahoutOptionParser(programName = "shortname") {
+ * head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ * // Input output options, non-driver specific
+ * parseIOOptions()
+ *
+ * // Algorithm specific options
+ * // Add in the new options
+ * opts = opts ++ SomeOptions
+ * note("\nAlgorithm control options:")
+ * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ * options + ("maxThings" -> x) ...
+ * }
+ * parser.parse(args, parser.opts) map { opts =>
+ * parser.opts = opts
+ * process()
+ * }
+ * }
+ *
+ * override def process: Unit = {
+ * start() // override to change the default Kryo or SparkConf before the distributed context is created
+ * // do the work here
+ * stop()
+ * }
+ *
+ * }}}
+ */
abstract class MahoutSparkDriver extends MahoutDriver {
- implicit protected var sparkConf = new SparkConf()
+ implicit var sparkConf = new SparkConf()
- /** Creates a Spark context to run the job inside.
- * Override to set the SparkConf values specific to the job,
- * these must be set before the context is created.
- * */
- protected def start() : Unit = {
+ /**
+ * Creates a Spark context to run the job inside.
+ * Override to set the SparkConf values specific to the job,
+ * these must be set before the context is created.
+ */
+ override protected def start() : Unit = {
if (!_useExistingContext) {
+ /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work
+ sparkConf.set("spark.files.userClassPathFirst", "true")
+ sparkConf.set("spark.executor.userClassPathFirst", "true")
+ */
+
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
//else leave as set in Spark config
- mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+ mc = mahoutSparkContext(
+ masterUrl = parser.opts("master").asInstanceOf[String],
appName = parser.opts("appName").asInstanceOf[String],
sparkConf = sparkConf)
}
}
+ /**
+ * Call this before start to use an existing context as when running multiple drivers from a scalatest suite.
+ * @param context An already set up context to run against
+ */
def useContext(context: DistributedContext): Unit = {
_useExistingContext = true
mc = context
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index a46d2ee..b3a1ec2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -18,26 +18,30 @@ package org.apache.mahout.drivers
import org.apache.spark.SparkConf
+/** Adds parsing of Spark specific options to the option parser */
class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
- def parseSparkOptions(implicit sparkConf: SparkConf) = {
+ def parseSparkOptions()(implicit sparkConf: SparkConf) = {
opts = opts ++ MahoutOptionParser.SparkOptions
opts = opts + ("appName" -> programName)
note("\nSpark config options:")
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options + ("master" -> x)
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
+ "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
}
- opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
- options + ("sparkExecutorMem" -> x)
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " +
+ "node (optional). Default: as Spark config specifies") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
}
opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
sparkConf.set(k, v)
} validate { x =>
if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
- } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+ } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before " +
+ "creating this job's Spark context (optional)")
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 8c1bce4..3b47452 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -26,12 +26,11 @@ import scala.collection.immutable.HashMap
/**
* Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
* Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * with domain specific IDS of the form
- * (row id, column id: strength, ...). The IDs will be preserved in the
+ * with domain specific IDS of the form (row id, column id: strength, ...). The IDs will be preserved in the
* output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
- * will be used to calculate row-wise similarity using log-likelihood
- * The options allow control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
+ * will be used to calculate row-wise similarity using log-likelihood. The options allow control of the input
+ * schema, file discovery, output schema, and control of algorithm parameters.
+ *
* To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
* values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
* and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
@@ -49,6 +48,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
private var readWriteSchema: Schema = _
/**
+ * Entry point, not using Scala App trait
* @param args Command line args, if empty a help message is printed.
*/
override def main(args: Array[String]): Unit = {
@@ -67,35 +67,34 @@ object RowSimilarityDriver extends MahoutSparkDriver {
options + ("maxObservations" -> x)
} text ("Max number of observations to consider per row (optional). Default: " +
RowSimilarityOptions("maxObservations")) validate { x =>
- if (x > 0) success else failure("Option --maxObservations must be > 0")
+ if (x > 0) success else failure("Option --maxObservations must be > 0")
}
opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
options + ("maxSimilaritiesPerRow" -> x)
} text ("Limit the number of similarities per item to this number (optional). Default: " +
RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
- if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+ if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
}
- /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
- * todo: replacing the threshold with some % of the best values and/or a
- * confidence measure expressed in standard deviations would be nice.
- */
+ // --threshold not implemented in SimilarityAnalysis.rowSimilarity
+ // todo: replacing the threshold with some % of the best values and/or a
+ // confidence measure expressed in standard deviations would be nice.
//Driver notes--driver specific
note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
//Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ parseIndexedDatasetFormatOptions()
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
@@ -106,9 +105,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
}
- override protected def start() : Unit = {
+ override protected def start(): Unit = {
- super.start
+ super.start()
readWriteSchema = new Schema(
"rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -120,8 +119,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
private def readIndexedDataset: IndexedDataset = {
- val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
- parser.opts("recursive").asInstanceOf[Boolean]).uris
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+ parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
null.asInstanceOf[IndexedDataset]
@@ -132,8 +131,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
}
- override def process: Unit = {
- start
+ override def process(): Unit = {
+ start()
val indexedDataset = readIndexedDataset
@@ -144,7 +143,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
- stop
+ stop()
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 368ee89..8531a0a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -58,56 +58,56 @@ object TestNBDriver extends MahoutSparkDriver {
//How to search for input
- parseFileDiscoveryOptions
+ parseFileDiscoveryOptions()
- //Drm output schema--not driver specific, drm specific
- parseDrmFormatOptions
+ //IndexedDataset output schema--not driver specific, IndexedDataset specific
+ parseIndexedDatasetFormatOptions()
//Spark config options--not driver specific
- parseSparkOptions
+ parseSparkOptions()
//Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
- parseGenericOptions
+ parseGenericOptions()
help("help") abbr ("h") text ("prints this usage text\n")
}
parser.parse(args, parser.opts) map { opts =>
parser.opts = opts
- process
+ process()
}
}
-/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-private def readTestSet: DrmLike[_] = {
- val inputPath = parser.opts("input").asInstanceOf[String]
- val trainingSet= drm.drmDfsRead(inputPath)
- trainingSet
-}
+ /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+ private def readTestSet: DrmLike[_] = {
+ val inputPath = parser.opts("input").asInstanceOf[String]
+ val trainingSet = drm.drmDfsRead(inputPath)
+ trainingSet
+ }
-/** read the model from pathToModel using NBModel.DfsRead(...) */
-private def readModel: NBModel = {
- val inputPath = parser.opts("pathToModel").asInstanceOf[String]
- val model= NBModel.dfsRead(inputPath)
- model
-}
+ /** read the model from pathToModel using NBModel.DfsRead(...) */
+ private def readModel: NBModel = {
+ val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+ val model = NBModel.dfsRead(inputPath)
+ model
+ }
-override def process: Unit = {
- start()
+ override def process(): Unit = {
+ start()
- val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
- val outputPath = parser.opts("output").asInstanceOf[String]
+ val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+ val outputPath = parser.opts("output").asInstanceOf[String]
- // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
+ // todo: get the -ow option in to check for a model in the path and overwrite if flagged.
- val testSet = readTestSet
- val model = readModel
- val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+ val testSet = readTestSet
+ val model = readModel
+ val analyzer = NaiveBayes.test(model, testSet, testComplementary)
- println(analyzer)
+ println(analyzer)
- stop
-}
+ stop()
+ }
}
[7/7] mahout git commit: NOJIRA removed o.a.m.Pair, cleaned up style,
incorporated most of PR #74 but left Spark version at 1.1.1 due to
bug in 1.2.1
Posted by pa...@apache.org.
NOJIRA removed o.a.m.Pair, cleaned up style, incorporated most of PR #74 but left Spark version at 1.1.1 due to bug in 1.2.1
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b0ee8e26
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b0ee8e26
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b0ee8e26
Branch: refs/heads/master
Commit: b0ee8e2657a900315140d21205416338cc955864
Parents: 15ee195
Author: pferrel <pa...@occamsmachete.com>
Authored: Wed Mar 4 13:52:04 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Wed Mar 4 13:52:04 2015 -0800
----------------------------------------------------------------------
.../src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/b0ee8e26/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index d8d04e2..380f4eb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -134,7 +134,9 @@ object RLikeDrmOps {
implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
- implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+ // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files
+ // Not sure why.
+ // implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
/**
* This is probably dangerous since it triggers implicit checkpointing with default storage level
[3/7] mahout git commit: scopt moved from spark to math-scala where
base engine agnostic driver lives
Posted by pa...@apache.org.
scopt moved from spark to math-scala where base engine agnostic driver lives
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/46398927
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/46398927
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/46398927
Branch: refs/heads/master
Commit: 463989272b4219716ca692cb4f9b757076d5c690
Parents: e005b28 901ef03
Author: pferrel <pa...@occamsmachete.com>
Authored: Sat Feb 21 08:45:39 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Sat Feb 21 08:45:39 2015 -0800
----------------------------------------------------------------------
h2o/pom.xml | 8 ++++----
math-scala/pom.xml | 4 ++--
.../main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala | 2 --
pom.xml | 2 +-
spark-shell/pom.xml | 4 ++--
spark/pom.xml | 2 +-
.../mahout/sparkbindings/test/DistributedSparkSuite.scala | 5 +++++
7 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/46398927/spark/pom.xml
----------------------------------------------------------------------
[4/7] mahout git commit: removed scopt from spark/pom.xml
Posted by pa...@apache.org.
removed scopt from spark/pom.xml
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/43bea68f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/43bea68f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/43bea68f
Branch: refs/heads/master
Commit: 43bea68f363143d87f567eedff147460db5bd146
Parents: 4639892 fde08a9
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:29:10 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:29:10 2015 -0800
----------------------------------------------------------------------
h2o/pom.xml | 8 ++++----
math-scala/pom.xml | 4 ++--
.../main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala | 2 ++
pom.xml | 2 +-
spark-shell/pom.xml | 4 ++--
spark/pom.xml | 2 +-
.../mahout/sparkbindings/test/DistributedSparkSuite.scala | 5 -----
7 files changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/43bea68f/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index c5a1141,ca66522..4f5fe88
--- a/pom.xml
+++ b/pom.xml
@@@ -111,7 -111,7 +111,7 @@@
<slf4j.version>1.7.5</slf4j.version>
<scala.major>2.10</scala.major>
<scala.version>2.10.4</scala.version>
- <spark.version>1.2.1</spark.version>
- <spark.version>1.1.0</spark.version>
++ <spark.version>1.1.1</spark.version>
<h2o.version>0.1.16</h2o.version>
</properties>
<issueManagement>
http://git-wip-us.apache.org/repos/asf/mahout/blob/43bea68f/spark/pom.xml
----------------------------------------------------------------------