You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/03/04 22:52:27 UTC

[1/7] mahout git commit: simplified driver and made required changes to all, note: left job assembly untouched

Repository: mahout
Updated Branches:
  refs/heads/master fde08a9a5 -> b0ee8e265


simplified driver and made required changes to all, note: left job assembly untouched


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d0f64205
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d0f64205
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d0f64205

Branch: refs/heads/master
Commit: d0f64205a116853aa471dd1361a635167da15fcc
Parents: 0f037cb
Author: pferrel <pa...@occamsmachete.com>
Authored: Sat Dec 27 15:43:41 2014 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Sat Dec 27 15:43:41 2014 -0800

----------------------------------------------------------------------
 .../apache/mahout/drivers/MahoutDriver.scala    |  2 +-
 spark/pom.xml                                   | 13 ++--
 spark/src/main/assembly/job.xml                 | 17 +++++-
 .../mahout/drivers/ItemSimilarityDriver.scala   | 12 +---
 .../mahout/drivers/MahoutSparkDriver.scala      | 20 +++---
 .../mahout/drivers/RowSimilarityDriver.scala    |  8 +--
 .../apache/mahout/drivers/TestNBDriver.scala    | 64 +++++++-------------
 .../apache/mahout/drivers/TrainNBDriver.scala   | 18 ------
 8 files changed, 61 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 8c1f8cf..3d9d4e1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -25,7 +25,7 @@ abstract class MahoutDriver {
 
 
   implicit protected var mc: DistributedContext = _
-  protected var parser: MahoutOptionParser = _
+  implicit protected var parser: MahoutOptionParser = _
 
   var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index f61f988..bcf9e30 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -157,8 +157,8 @@
         </executions>
       </plugin>
 
-      <!-- create job jar to include CLI driver deps-->
-      <!-- leave this in even though there are no hadoop mapreduce jobs in this module -->
+      <!-- create an all dependencies job.jar -->
+      <!-- todo: before release we need a better way to do this MAHOUT-1636 -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
@@ -171,13 +171,14 @@
             </goals>
             <configuration>
               <descriptors>
-                <descriptor>src/main/assembly/job.xml</descriptor>
+                <descriptor>../spark/src/main/assembly/job.xml</descriptor>
               </descriptors>
             </configuration>
           </execution>
         </executions>
       </plugin>
 
+
     </plugins>
   </build>
 <!--
@@ -319,12 +320,6 @@
 
     <!--  3rd-party -->
 
-    <dependency>
-      <groupId>com.github.scopt</groupId>
-      <artifactId>scopt_2.10</artifactId>
-      <version>3.2.0</version>
-    </dependency>
-
     <!-- scala stuff -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
index 0c41f3d..2bdb3ce 100644
--- a/spark/src/main/assembly/job.xml
+++ b/spark/src/main/assembly/job.xml
@@ -42,5 +42,20 @@
       </excludes>
     </dependencySet>
   </dependencySets>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/classes</directory>
+      <outputDirectory>/</outputDirectory>
+      <excludes>
+        <exclude>*.jar</exclude>
+      </excludes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/classes</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>driver.classes.default.props</include>
+      </includes>
+    </fileSet>
+  </fileSets>
 </assembly>
-  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 01a18c9..36ba6ef 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -117,15 +117,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
-      appName: String = parser.opts("appName").asInstanceOf[String]):
-    Unit = {
+  override protected def start() : Unit = {
 
-    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
-      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-    //else leave as set in Spark config
-
-    super.start(masterUrl, appName)
+    super.start
 
     readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
       "filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -208,7 +202,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
   }
 
   override def process: Unit = {
-    start()
+    start
 
     val indexedDatasets = readIndexedDatasets
     val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index e6299fd..ab40c3a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -34,7 +34,6 @@ import org.apache.mahout.sparkbindings._
   *
   *   override def main(args: Array[String]): Unit = {
   *
-  *
   *     val parser = new MahoutOptionParser(programName = "shortname") {
   *       head("somedriver", "Mahout 1.0-SNAPSHOT")
   *
@@ -55,7 +54,7 @@ import org.apache.mahout.sparkbindings._
   *   }
   *
   *   override def process: Unit = {
-  *     start()
+  *     start // override to change the default Kryo or SparkConf before the distributed context is created
   *     // do the work here
   *     stop
   *   }
@@ -70,15 +69,18 @@ abstract class MahoutSparkDriver extends MahoutDriver {
   /** Creates a Spark context to run the job inside.
     * Override to set the SparkConf values specific to the job,
     * these must be set before the context is created.
-    * @param masterUrl Spark master URL
-    * @param appName  Name to display in Spark UI
     * */
-  protected def start(masterUrl: String, appName: String) : Unit = {
-    sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
-
+  protected def start() : Unit = {
     if (!_useExistingContext) {
-      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+      sparkConf.set("spark.kryo.referenceTracking", "false")
+        .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
+
+      if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+        sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+      //else leave as set in Spark config
+      mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+        appName = parser.opts("appName").asInstanceOf[String],
+        sparkConf = sparkConf)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 9b44b95..8c1bce4 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -106,11 +106,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
-      appName: String = parser.opts("appName").asInstanceOf[String]):
-    Unit = {
+  override protected def start() : Unit = {
 
-    super.start(masterUrl, appName)
+    super.start
 
     readWriteSchema = new Schema(
       "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -135,7 +133,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
   }
 
   override def process: Unit = {
-    start()
+    start
 
     val indexedDataset = readIndexedDataset
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 7d0738c..368ee89 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -78,54 +78,36 @@ object TestNBDriver extends MahoutSparkDriver {
     }
   }
 
-  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
-      appName: String = parser.opts("appName").asInstanceOf[String]):
-    Unit = {
-
-    // will be only specific to this job.
-    // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default
-
-    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
-      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-
-    // Note: set a large akka frame size for DSL NB (20)
-    //sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes..
-    //else leave as set in Spark config
-
-    super.start(masterUrl, appName)
-
-    }
-
-  /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-  private def readTestSet: DrmLike[_] = {
-    val inputPath = parser.opts("input").asInstanceOf[String]
-    val trainingSet= drm.drmDfsRead(inputPath)
-    trainingSet
-  }
+/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+private def readTestSet: DrmLike[_] = {
+  val inputPath = parser.opts("input").asInstanceOf[String]
+  val trainingSet= drm.drmDfsRead(inputPath)
+  trainingSet
+}
 
-  /** read the model from pathToModel using NBModel.DfsRead(...) */
-  private def readModel: NBModel = {
-    val inputPath = parser.opts("pathToModel").asInstanceOf[String]
-    val model= NBModel.dfsRead(inputPath)
-    model
-  }
+/** read the model from pathToModel using NBModel.DfsRead(...) */
+private def readModel: NBModel = {
+  val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+  val model= NBModel.dfsRead(inputPath)
+  model
+}
 
-  override def process: Unit = {
-    start()
+override def process: Unit = {
+  start()
 
-    val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
-    val outputPath = parser.opts("output").asInstanceOf[String]
+  val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+  val outputPath = parser.opts("output").asInstanceOf[String]
 
-    // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
+  // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
 
-    val testSet = readTestSet
-    val model = readModel
-    val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+  val testSet = readTestSet
+  val model = readModel
+  val analyzer= NaiveBayes.test(model, testSet, testComplementary)
 
-    println(analyzer)
+  println(analyzer)
 
-    stop
-  }
+  stop
+}
 
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 35ff90b..3d03c1d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -72,24 +72,6 @@ object TrainNBDriver extends MahoutSparkDriver {
     }
   }
 
-  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
-      appName: String = parser.opts("appName").asInstanceOf[String]):
-    Unit = {
-
-    // will be only specific to this job.
-    // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default
-
-    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
-      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-
-    // Note: set a large akka frame size for DSL NB (20)
-    // sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes..
-    // else leave as set in Spark config
-
-    super.start(masterUrl, appName)
-
-    }
-
   /** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
   private def readTrainingSet: DrmLike[_]= {
     val inputPath = parser.opts("input").asInstanceOf[String]


[5/7] mahout git commit: removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1

Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 895bd01..a90e672 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -26,18 +26,21 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
 
-/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]]
-  */
+/**
+ * Extends Reader trait to supply the [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] as
+ * the type read and a element and row reader functions for reading text delimited files as described in the
+ * [[org.apache.mahout.math.indexeddataset.Schema]]
+ */
 trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
-  /** Read in text delimited elements from all URIs in the comma delimited source String and return
-    * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
-    * no strength value in the element, assume it's presence means a strength of 1.
-    *
-    * @param mc context for the Spark job
-    * @param readSchema describes the delimiters and positions of values in the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
-    * @return
-    */
+  /**
+   * Read in text delimited elements from all URIs in the comma delimited source String and return
+   * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+   * no strength value in the element, assume it's presence means a strength of 1.
+   * @param mc context for the Spark job
+   * @param readSchema describes the delimiters and positions of values in the text delimited file.
+   * @param source comma delimited URIs of text files to be read from
+   * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+   */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
@@ -116,16 +119,16 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
     }
   }
 
-  /** Read in text delimited rows from all URIs in this comma delimited source String and return
-    * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
-    * no strength value in the element, assume it's presence means a strength of 1.
-    *
-    * @param mc context for the Spark job
-    * @param readSchema describes the delimiters and positions of values in the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
-    * @return
-    */
-  protected def drmReader(
+  /**
+   * Read in text delimited rows from all URIs in this comma delimited source String and return
+   * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+   * no strength value in the element, assume it's presence means a strength of 1.
+   * @param mc context for the Spark job
+   * @param readSchema describes the delimiters and positions of values in the text delimited file.
+   * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
+   * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+   */
+  protected def rowReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
@@ -205,33 +208,36 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
     }
   }
 
-  // this creates a BiMap from an ID collection. The ID points to an ordinal int
-  // which is used internal to Mahout as the row or column ID
-  // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
-  // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
-  // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
-  private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+  /**
+   * Creates a BiMap from an ID collection. The ID points to an ordinal in which is used internal to Mahout
+   * as the row or column ID
+   * todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
+   * non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
+   * in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
+   */
+  private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(),
+      entries: Array[String]):
+    BiMap[String, Int] = {
     var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
     for (entry <- entries) {
       if (!dictionary.contains(entry)){
         dictionary.put(entry, index)
         index += 1
-      }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason they do
+      }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason
+      // they do
     }
     dictionary
   }
 }
 
+/** Extends the Writer trait to supply the type being written and supplies the writer function */
 trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
-
-  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
-  /** Read in text delimited elements from all URIs in this comma delimited source String.
-    *
-    * @param mc context for the Spark job
-    * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
-    * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
-    */
+  /**
+   * Read in text delimited elements from all URIs in this comma delimited source String.
+   * @param mc context for the Spark job
+   * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
+   * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
+   */
   protected def writer(
       mc: DistributedContext,
       writeSchema: Schema,
@@ -262,21 +268,20 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
       matrix.rdd.map { case (rowID, itemVector) =>
 
         // turn non-zeros into list for sorting
-        val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]]
+        var itemList = List[(Int, Double)]()
         for (ve <- itemVector.nonZeroes) {
-          val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
-          itemList += item
+          itemList = itemList :+ (ve.index, ve.get)
         }
         //sort by highest value descending(-)
-        val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+        val vector = if (sort) itemList.sortBy { elem => -elem._2 } else itemList
 
         // first get the external rowID token
         if (!vector.isEmpty){
           var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
           // for the rest of the row, construct the vector contents of elements (external column ID, strength value)
           for (item <- vector) {
-            line += columnIDDictionary.inverse.get(item.getFirst)
-            if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+            line += columnIDDictionary.inverse.get(item._1)
+            if (!omitScore) line += columnIdStrengthDelim + item._2
             line += elementDelim
           }
           // drop the last delimiter, not needed to end the line
@@ -296,26 +301,33 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
 /** A combined trait that reads and writes */
 trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter
 
-/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
-  * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
-  * @param mc Spark context for reading files
-  * @note The source is supplied to Reader#readElementsFrom .
-  * */
+/**
+ * Reads text delimited files into an IndexedDataset. Classes can be used to supply trait params in their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
+ * @param mc Spark context for reading files
+ * @note The source is supplied to Reader#readElementsFrom .
+ */
 class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
     (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
 
-/** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
-  * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
-  * @param mc Spark context for reading files
-  * @note the destination is supplied to Writer#writeTo
-  * */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
-
-/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
-  * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
-  * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
-  * @param mc Spark context for reading the files, may be implicitly defined.
-  * */
+/**
+ * Writes  text delimited files into an IndexedDataset. Classes can be used to supply trait params in their
+ * constructor.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading files
+ * @note the destination is supplied to Writer#writeTo
+ */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)
+    (implicit val mc: DistributedContext)
+  extends TDIndexedDatasetWriter
+
+/**
+ * Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in
+ * their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading the files, may be implicitly defined.
+ */
 class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true)
     (implicit val mc: DistributedContext)
   extends TDIndexedDatasetReaderWriter

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 3d03c1d..4f88c13 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -52,23 +52,23 @@ object TrainNBDriver extends MahoutSparkDriver {
       
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
-      //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      //IndexedDataset output schema--not driver specific, IndexedDataset specific
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
@@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver {
     trainingSet
   }
 
-  override def process: Unit = {
+  override def process(): Unit = {
     start()
 
     val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean]
@@ -91,7 +91,7 @@ object TrainNBDriver extends MahoutSparkDriver {
 
     model.dfsWrite(outputPath)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index c0d36c6..47eb40b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -252,10 +252,10 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * reads an IndexedDatasetSpark from default text delimited files
+   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+   * delimited files. Reads a vector per row.
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
-   * @return
    */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
@@ -263,15 +263,15 @@ object SparkEngine extends DistributedEngine {
       (implicit sc: DistributedContext):
     IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
-    val ids = reader.readDRMFrom(src, existingRowIDs)
+    val ids = reader.readRowsFrom(src, existingRowIDs)
     ids
   }
 
   /**
-   * reads an IndexedDatasetSpark from default text delimited files
+   * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text
+   * delimited files. Reads an element per row.
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
-   * @return
    */
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetElementReadSchema,

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index d3aa0a8..30b32ad 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -26,8 +26,12 @@ import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema,
 /**
  * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
  * dfsWrite method
+ * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+ * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
+ * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs
  */
-class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int],
+    val columnIDs: BiMap[String,Int])
   extends IndexedDataset {
 
   /** Secondary constructor enabling immutability */
@@ -35,14 +39,19 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[St
     this(id2.matrix, id2.rowIDs, id2.columnIDs)
   }
 
-  /** Factory method used to create this extending class when the interface of
-    * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */
+  /**
+   * Factory method used to create this extending class when the interface of
+   * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known.
+   */
   override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
     IndexedDatasetSpark = {
     new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
   }
 
-  /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+  /**
+   * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+   * replace the writer to change how it is written.
+   */
   override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
       (implicit sc: DistributedContext):
     Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c441716..8199708 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -223,7 +223,11 @@ package object sparkbindings {
           // During maven tests, the maven classpath also creeps in for some reason
           !n.matches(".*/.m2/.*")
         )
-
+    /* verify jar passed to context
+    log.info("\n\n\n")
+    mcjars.foreach(j => log.info(j))
+    log.info("\n\n\n")
+    */
     mcjars
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 76c3553..ea6b40f 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -63,7 +63,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     "iphone\tipad:1.7260924347106847",
     "surface")
 
-  val CrossIndicatorLines = Iterable(
+  val CrossSimilarityLines = Iterable(
     "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
@@ -78,49 +78,45 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     "iphone\tipad:1.7260924347106847",
     "surface"))
 
-  val CrossIndicatorTokens = tokenize(Iterable(
+  val CrossSimilarityTokens = tokenize(Iterable(
     "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
 
-  // now in MahoutSuite
-  // final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
-
   /*
     //Clustered Spark and HDFS, not a good everyday build test
     ItemSimilarityDriver.main(Array(
         "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
-        "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+        "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/similarityMatrices/",
         "--master", "spark://occam4:7077",
         "--filter1", "purchase",
         "--filter2", "view",
         "--inDelim", ",",
         "--itemIDColumn", "2",
         "--rowIDColumn", "0",
-        "--filterColumn", "1"
-    ))
-*/
+        "--filterColumn", "1"))
+  */
   // local multi-threaded Spark with HDFS using large dataset
   // not a good build test.
-  /*    ItemSimilarityDriver.main(Array(
+  /*
+    ItemSimilarityDriver.main(Array(
       "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
-      "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+      "--output", "hdfs://occam4:54310/user/pat/xrsj/similarityMatrices/",
       "--master", "local[4]",
       "--filter1", "purchase",
       "--filter2", "view",
       "--inDelim", ",",
       "--itemIDColumn", "2",
       "--rowIDColumn", "0",
-      "--filterColumn", "1"
-    ))
+      "--filterColumn", "1"))
   */
 
   test("ItemSimilarityDriver, non-full-spec CSV") {
 
     val InFile = TmpDir + "in-file.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -163,10 +159,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
     // todo: these comparisons rely on a sort producing the same lines, which could possibly
     // fail since the sort is on value and these can be the same for all items in a vector
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
   }
 
 
@@ -174,7 +170,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
   test("ItemSimilarityDriver TSV ") {
 
     val InFile = TmpDir + "in-file.tsv/"
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1\tpurchase\tiphone",
@@ -216,17 +212,17 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
     // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
     // some error cases
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
 
   }
 
   test("ItemSimilarityDriver log-ish files") {
 
     val InFile = TmpDir + "in-file.log/"
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
@@ -267,10 +263,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--filterColumn", "2"))
 
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
 
   }
 
@@ -280,7 +276,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     val InFilename = "in-file.tsv"
     val InPath = InDir + InFilename
 
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     val lines = Array(
       "0,0,1",
@@ -312,8 +308,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--output", OutPath,
       "--master", masterUrl))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs Answer
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs Answer
 
   }
 
@@ -323,7 +319,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     val InFilename = "in-file.tsv"
     val InPath = InDir + InFilename
 
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     val lines = Array(
       "0,0,1",
@@ -356,8 +352,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--master", masterUrl,
       "--omitStrength"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs Answer
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs Answer
 
   }
 
@@ -397,7 +393,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     val InPathM2 = InDirM2 + InFilenameM2
 
     val InPathStart = TmpDir + "data/"
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle = true).saveAsTextFile(InDirM1)
@@ -429,10 +425,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--filenamePattern", "m..tsv",
       "--recursive"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
 
   }
 
@@ -440,7 +436,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -482,10 +478,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--rowIDColumn", "0",
       "--filterColumn", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens
 
   }
 
@@ -493,7 +489,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -549,10 +545,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--rowIDColumn", "0",
       "--filterColumn", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
-    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
+    tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
 
   }
 
@@ -566,7 +562,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
     */
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -590,7 +586,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
       "surface\tmobile_acc:0.6795961471815897",
       "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
-      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
+      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 " +
+        "mobile_acc:1.7260924347106847",
       "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
@@ -612,10 +609,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
       "--filterColumn", "1",
       "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
 
   }
 
@@ -673,7 +670,7 @@ removed ==> u3	0	      0	      1	          0
     */
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -719,10 +716,10 @@ removed ==> u3	0	      0	      1	          0
       "--filterColumn", "1",
       "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
   }
 
   // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index 29c8bea..f18ec70 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -57,6 +57,10 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
 //    initContext()
   }
 
+  override protected def afterAll(configMap: ConfigMap): Unit = {
+    super.afterAll(configMap)
+    resetContext()
+  }
 
   override protected def beforeAll(configMap: ConfigMap): Unit = {
     super.beforeAll(configMap)


[2/7] mahout git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout

Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e005b28b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e005b28b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e005b28b

Branch: refs/heads/master
Commit: e005b28b263ce5eeab6f105ead660a3fe89cb756
Parents: d0f6420 1ca1ef3
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Feb 12 10:22:38 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Feb 12 10:22:38 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/mahout/h2obindings/H2OHelper.java  | 9 ++++-----
 pom.xml                                                     | 2 +-
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[6/7] mahout git commit: removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1

Posted by pa...@apache.org.
removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/15ee1951
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/15ee1951
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/15ee1951

Branch: refs/heads/master
Commit: 15ee1951d1ba8d8ee7d24e2510af187fd984c8cf
Parents: 43bea68
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:38:05 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:38:05 2015 -0800

----------------------------------------------------------------------
 examples/bin/get-all-examples.sh                |   4 +-
 math-scala/pom.xml                              |  25 +++-
 .../apache/mahout/drivers/MahoutDriver.scala    |  11 +-
 .../mahout/drivers/MahoutOptionParser.scala     |  78 ++++++-----
 .../mahout/math/cf/SimilarityAnalysis.scala     | 104 +++++++--------
 .../org/apache/mahout/math/drm/package.scala    |   4 +-
 .../math/indexeddataset/IndexedDataset.scala    |  23 ++--
 .../math/indexeddataset/ReaderWriter.scala      |  71 ++++++++--
 .../mahout/math/indexeddataset/Schema.scala     |  82 ++++++------
 spark/pom.xml                                   |   6 +-
 spark/src/main/assembly/dependency-reduced.xml  |  44 +++++++
 spark/src/main/assembly/job.xml                 |  61 ---------
 .../apache/mahout/common/HDFSPathSearch.scala   |  17 ++-
 .../mahout/drivers/ItemSimilarityDriver.scala   |  78 ++++++-----
 .../mahout/drivers/MahoutSparkDriver.scala      | 106 ++++++++-------
 .../drivers/MahoutSparkOptionParser.scala       |  16 ++-
 .../mahout/drivers/RowSimilarityDriver.scala    |  43 +++---
 .../apache/mahout/drivers/TestNBDriver.scala    |  58 ++++----
 .../drivers/TextDelimitedReaderWriter.scala     | 132 ++++++++++---------
 .../apache/mahout/drivers/TrainNBDriver.scala   |  16 +--
 .../mahout/sparkbindings/SparkEngine.scala      |  10 +-
 .../indexeddataset/IndexedDatasetSpark.scala    |  17 ++-
 .../apache/mahout/sparkbindings/package.scala   |   6 +-
 .../drivers/ItemSimilarityDriverSuite.scala     | 117 ++++++++--------
 .../test/DistributedSparkSuite.scala            |   4 +
 25 files changed, 619 insertions(+), 514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/examples/bin/get-all-examples.sh
----------------------------------------------------------------------
diff --git a/examples/bin/get-all-examples.sh b/examples/bin/get-all-examples.sh
index a24c7fd..4128e47 100755
--- a/examples/bin/get-all-examples.sh
+++ b/examples/bin/get-all-examples.sh
@@ -26,8 +26,8 @@ echo " Solr-recommender example: "
 echo " 1) imports text 'log files' of some delimited form for user preferences"
 echo " 2) creates the correct Mahout files and stores distionaries to translate external Id to and from Mahout Ids"
 echo " 3) it implements a prototype two actions 'cross-recommender', which takes two actions made by the same user and creates recommendations"
-echo " 4) it creates output for user->preference history CSV and and item->similar items 'indicator' matrix for use in a Solr-recommender."
-echo "    To use Solr you would index the indicator matrix CSV, and use user preference history from the history CSV as a query, the result"
+echo " 4) it creates output for user->preference history CSV and and item->similar items 'similarity' matrix for use in a Solr-recommender."
+echo "    To use Solr you would index the similarity matrix CSV, and use user preference history from the history CSV as a query, the result"
 echo "    from Solr will be an ordered list of recommendations returning the same item Ids as were input."
 echo " For further description see the README.md here https://github.com/pferrel/solr-recommender"
 echo " To build run 'cd solr-recommender; mvn install'"

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 66309d6..50cea7a 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -175,17 +175,36 @@
       
     <dependency>
       <groupId>com.github.scopt</groupId>
-      <artifactId>scopt_2.10</artifactId>
-      <version>3.2.0</version>
+      <artifactId>scopt_${scala.major}</artifactId>
+      <version>3.3.0</version>
     </dependency>
 
     <!-- scala stuff -->
     <dependency>
       <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-actors</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scalap</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.major}</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 3d9d4e1..32515f1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -19,23 +19,24 @@ package org.apache.mahout.drivers
 
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Extended by a platform specific version of this class to create a Mahout CLI driver.
- */
+/** Extended by a platform specific version of this class to create a Mahout CLI driver. */
 abstract class MahoutDriver {
 
-
   implicit protected var mc: DistributedContext = _
   implicit protected var parser: MahoutOptionParser = _
 
   var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
 
+  /** must be overriden to setup the DistributedContext mc*/
+  protected def start() : Unit
+
   /** Override (optionally) for special cleanup */
-  protected def stop: Unit = {
+  protected def stop(): Unit = {
     if (!_useExistingContext) mc.close
   }
 
   /** This is where you do the work, call start first, then before exiting call stop */
-  protected def process: Unit
+  protected def process(): Unit
 
   /** Parse command line and call process */
   def main(args: Array[String]): Unit

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 479a8d0..3b5affd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -20,16 +20,17 @@ import scopt.OptionParser
 
 import scala.collection.immutable
 
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
-  * keep both standarized.
-  * @param programName Name displayed in help message, the name by which the driver is invoked.
-  * @note options are engine neutral by convention. See the engine specific extending class for
-  *       to add Spark or other engine options.
-  * */
+/**
+ * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ *       to add Spark or other engine options.
+ */
 class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
 
   // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so they can override and previous options!
+  // Note: always put the driver specific options at the last so they can override any previous options!
   var opts = Map.empty[String, Any]
 
   override def showUsageOnError = true
@@ -39,12 +40,14 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     note("Input, output options")
     opt[String]('i', "input") required() action { (x, options) =>
       options + ("input" -> x)
-    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" +
+      " (required)")
 
     if (numInputs == 2) {
       opt[String]("input2") abbr ("i2") action { (x, options) =>
         options + ("input2" -> x)
-      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " +
+        "(optional). Default: empty.")
     }
 
     opt[String]('o', "output") required() action { (x, options) =>
@@ -53,11 +56,11 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       } else {
         options + ("output" -> (x + "/"))
       }
-    } text ("Path for output, any local or HDFS supported URI (required)")
+    } text ("Path for output directory, any HDFS supported URI (required)")
 
   }
 
-  def parseGenericOptions = {
+  def parseGenericOptions() = {
     opts = opts ++ MahoutOptionParser.GenericOptions
     opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
       options + ("randomSeed" -> x)
@@ -65,48 +68,55 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       if (x > 0) success else failure("Option --randomSeed must be > 0")
     }
 
-    //output both input DRMs
+    //output both input IndexedDatasets
     opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
       options + ("writeAllDatasets" -> true)
     }//Hidden option, though a user might want this.
   }
 
-  def parseElementInputSchemaOptions{
+  def parseElementInputSchemaOptions() = {
     //Input text file schema--not driver specific but input data specific, elements input,
-    // not drms
+    // not rows of IndexedDatasets
     opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
     note("\nInput text file schema options:")
-    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
-      options + ("inDelim" -> x)
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action {
+      (x, options) =>
+        options + ("inDelim" -> x)
     }
 
     opt[String]("filter1") abbr ("f1") action { (x, options) =>
       options + ("filter1" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " +
+      "Default: no filter, all data is used")
 
     opt[String]("filter2") abbr ("f2") action { (x, options) =>
       options + ("filter2" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " +
+      "If not present no secondary dataset is collected")
 
     opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
       options + ("rowIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate {
+      x =>
+        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
     }
 
     opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
       options + ("itemIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate {
+      x =>
+        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
     }
 
     opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
       options + ("filterColumn" -> x)
-    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " +
+      "filter") validate { x =>
       if (x >= -1) success else failure("Option --filterColNum must be >= -1")
     }
 
-    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" +
+      " \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
 
     //check for column consistency
     checkConfig { options: Map[String, Any] =>
@@ -126,7 +136,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
   }
 
-  def parseFileDiscoveryOptions = {
+  def parseFileDiscoveryOptions() = {
     //File finding strategy--not driver specific
     opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
     note("\nFile discovery options:")
@@ -136,12 +146,13 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
     opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
       options + ("filenamePattern" -> x)
-    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option " +
+      "or \"^part-.*\" if --input is a directory")
 
   }
 
-  def parseDrmFormatOptions = {
-    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+  def parseIndexedDatasetFormatOptions() = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
     note("\nOutput text file schema options:")
     opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
       options + ("rowKeyDelim" -> x)
@@ -160,13 +171,16 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     } text ("Do not write the strength to the output files (optional), Default: false.")
     note("This option is used to output indexable data for creating a search engine recommender.")
 
-    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+    note("\nDefault delimiters will produce output of the form: " +
+      "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
   }
 
 }
 
-/** Companion object defines default option groups for reference in any driver that needs them.
-  * @note not all options are platform neutral so other platforms can add default options here if desired */
+/**
+ * Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired
+ */
 object MahoutOptionParser {
 
   // set up the various default option groups
@@ -196,7 +210,7 @@ object MahoutOptionParser {
     "filter2" -> null.asInstanceOf[String],
     "inDelim" -> "[,\t ]")
 
-  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+  final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any](
     "rowKeyDelim" -> "\t",
     "columnIdStrengthDelim" -> ":",
     "elementDelim" -> " ",

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index e1766e8..6557ab0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -32,7 +32,7 @@ import scala.util.Random
 
 
 /**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
  * available at http://www.mapr.com/practical-machine-learning
  *
  * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
@@ -44,14 +44,16 @@ object SimilarityAnalysis extends Serializable {
   /** Compares (Int,Double) pairs by the second value */
   private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
 
-  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
-    * and returns a list of indicator and cross-indicator matrices
-    * @param drmARaw Primary interaction matrix
-    * @param randomSeed when kept to a constant will make repeatable downsampling
-    * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
-    * @param maxNumInteractions max number of interactions after downsampling, default: 500
-    * @return
-    */
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+   * and returns a list of similarity and cross-similarity matrices
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
+   *         cross-cooccurrence
+   */
   def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
                     maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
 
@@ -69,11 +71,11 @@ object SimilarityAnalysis extends Serializable {
     // Compute co-occurrence matrix A'A
     val drmAtA = drmA.t %*% drmA
 
-    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
-    val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+    val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
 
-    var indicatorMatrices = List(drmIndicatorsAtA)
+    var similarityMatrices = List(drmSimilarityAtA)
 
     // Now look at cross-co-occurrences
     for (drmBRaw <- drmBs) {
@@ -86,10 +88,10 @@ object SimilarityAnalysis extends Serializable {
       // Compute cross-co-occurrence matrix A'B
       val drmAtB = drmA.t %*% drmB
 
-      val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+      val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
         bcastInteractionsPerItemA, bcastInteractionsPerThingB)
 
-      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+      similarityMatrices = similarityMatrices :+ drmSimilarityAtB
 
       drmB.uncache()
     }
@@ -97,19 +99,21 @@ object SimilarityAnalysis extends Serializable {
     // Unpin downsampled interaction matrix
     drmA.uncache()
 
-    // Return list of indicator matrices
-    indicatorMatrices
+    // Return list of similarity matrices
+    similarityMatrices
   }
 
-  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
-    * and returns a list of indicator and cross-indicator matrices
-    * Somewhat easier to use method, which handles the ID dictionaries correctly
-    * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
-    * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
-    * @param maxInterestingItemsPerThing max similarities per items
-    * @param maxNumInteractions max number of input items per item
-    * @return
-    */
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
+   * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
+   * dictionaries correctly
+   * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+   * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingItemsPerThing max similarities per items
+   * @param maxNumInteractions max number of input items per item
+   * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
+   *         IndexedDatasets for cooccurrence and cross-cooccurrence
+   */
   def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
       randomSeed: Int = 0xdeadbeef,
       maxInterestingItemsPerThing: Int = 50,
@@ -127,13 +131,13 @@ object SimilarityAnalysis extends Serializable {
     retIDSs.toList
   }
 
-  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
-    * @param drmARaw Primary interaction matrix
-    * @param randomSeed when kept to a constant will make repeatable downsampling
-    * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
-    * @param maxNumInteractions max number of interactions after downsampling, default: 500
-    * @return
-    */
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   */
   def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
                     maxNumInteractions: Int = 500): DrmLike[Int] = {
 
@@ -152,20 +156,20 @@ object SimilarityAnalysis extends Serializable {
     val drmAAt = drmA %*% drmA.t
 
     // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
-    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
+    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
 
     drmSimilaritiesAAt
   }
 
-  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
-    * Uses IndexedDatasets, which handle external ID dictionaries properly
-    * @param indexedDataset compare each row to every other
-    * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
-    * @param maxInterestingSimilaritiesPerRow max elements returned in each row
-    * @param maxObservationsPerRow max number of input elements to use
-    * @return
-    */
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+   * Uses IndexedDatasets, which handle external ID dictionaries properly
+   * @param indexedDataset compare each row to every other
+   * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+   * @param maxObservationsPerRow max number of input elements to use
+   */
   def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
       maxInterestingSimilaritiesPerRow: Int = 50,
       maxObservationsPerRow: Int = 500):
@@ -175,10 +179,7 @@ object SimilarityAnalysis extends Serializable {
     indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
   }
 
-   /**
-     * Compute loglikelihood ratio
-     * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-     */
+   /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
   def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
     numInteractionsWithAandB: Long, numInteractions: Long) = {
 
@@ -220,7 +221,7 @@ object SimilarityAnalysis extends Serializable {
 
               val candidate = thingA -> llr
 
-              // matches legacy hadoop code and maps values to range (0..1)
+              // legacy hadoop code maps values to range (0..1) via
               // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
               // val candidate = thingA -> normailizedLLR
 
@@ -253,7 +254,7 @@ object SimilarityAnalysis extends Serializable {
    * @param drmM matrix to downsample
    * @param seed random number generator seed, keep to a constant if repeatability is neccessary
    * @param maxNumInteractions number of elements in a row of the returned matrix
-   * @return
+   * @return the downsampled DRM
    */
   def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
 
@@ -269,9 +270,8 @@ object SimilarityAnalysis extends Serializable {
       case (keys, block) =>
         val numInteractions: Vector = bcastNumInteractions
 
-        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
-        // don't use commons since scala's is included anyway
-        // val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
+        //failures
         val random = new Random(MurmurHash.hash(keys(0), seed))
 
         val downsampledBlock = block.like()

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 3afbecb..81f6ab1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -123,13 +123,13 @@ package object indexeddataset {
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create())
-      (implicit ctx: DistributedContext):
+    (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
 
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create())
-      (implicit ctx: DistributedContext):
+    (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
index c7eb2cb..f6811e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -22,15 +22,12 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.math.indexeddataset
 
 /**
-  * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
-  * ID/label translation dictionaries.
-  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
-  * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
-  * used internal to Mahout core code.
-  *
-  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
-  *       to be not created when not needed.
-  */
+ * Wrap an  [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]]
+ * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout
+ * core code.
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created
+ *       when not needed.
+ */
 
 trait IndexedDataset {
   val matrix: CheckpointedDrm[Int]
@@ -39,12 +36,13 @@ trait IndexedDataset {
 
   /**
    * Write a text delimited file(s) with the row and column IDs from dictionaries.
-   * @param dest
-   * @param schema
+   * @param dest write location, usually a directory
+   * @param schema params to control writing
+   * @param sc the [[org.apache.mahout.math.drm.DistributedContext]] used to do a distributed write
    */
   def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
 
-  /** Factory method, creates the extending class */
+  /** Factory method, creates the extending class  and returns a new instance */
   def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
     IndexedDataset
 
@@ -52,6 +50,7 @@ trait IndexedDataset {
    * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
    * No changes are made to the underlying drm.
    * @param n number to use for new row cardinality, should be larger than current
+   * @return a new IndexedDataset or extending class with new cardinality
    * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
    *       results.
    */

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
index cf429f5..f7653ae 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -20,49 +20,98 @@ package org.apache.mahout.math.indexeddataset
 import com.google.common.collect.{BiMap, HashBiMap}
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
-  * which also defines the type to be read.
-  * @tparam T type of object to read.
-  */
+/**
+ * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an
+ * extending trait, which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
 trait Reader[T]{
 
   val mc: DistributedContext
   val readSchema: Schema
 
+  /**
+   * Override in extending trait to supply T and perform a parallel read of collection elements
+   * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+   * @param readSchema map of parameters controlling formating and how the read is executed
+   * @param source list of comma delimited files to read from
+   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+   *                       been applied to this collection--used to synchronize row IDs between several
+   *                       collections
+   * @return a new collection of type T
+   */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
       existingRowIDs: BiMap[String, Int]): T
 
-  protected def drmReader(
+  /**
+   * Override in extending trait to supply T and perform a parallel read of collection rows
+   * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+   * @param readSchema map of parameters controlling formating and how the read is executed
+   * @param source list of comma delimited files to read from
+   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+   *                       been applied to this collection--used to synchronize row IDs between several
+   *                       collections
+   * @return a new collection of type T
+   */
+  protected def rowReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
       existingRowIDs: BiMap[String, Int]): T
 
+  /**
+   * Public method called to perform the element-wise read. Usually no need to override
+   * @param source comma delimited URIs to read from
+   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+   *                       to synchronize all row ids is several collections
+   * @return a new collection of type T
+   */
   def readElementsFrom(
       source: String,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
     elementReader(mc, readSchema, source, existingRowIDs)
 
-  def readDRMFrom(
+  /**
+   * Public method called to perform the row-wise read. Usually no need to override.
+   * @param source comma delimited URIs to read from
+   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+   *                       to synchronize all row ids is several collections
+   * @return  a new collection of type T
+   */
+  def readRowsFrom(
       source: String,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    drmReader(mc, readSchema, source, existingRowIDs)
+    rowReader(mc, readSchema, source, existingRowIDs)
 }
 
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
-  * which also defines the type to be written.
-  * @tparam T type of object to write.
-  */
+/**
+ * Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write, usually a matrix type thing.
+ */
 trait Writer[T]{
 
   val mc: DistributedContext
   val sort: Boolean
   val writeSchema: Schema
 
+  /**
+   * Override to provide writer method
+   * @param mc context used to do distributed write
+   * @param writeSchema map with params to control format and execution of the write
+   * @param dest root directory to write to
+   * @param collection usually a matrix like collection to write
+   * @param sort flags whether to sort the rows by value descending
+   */
   protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
 
+  /**
+   * Call this method to perform the write, usually no need to override.
+   * @param collection what to write
+   * @param dest root directory to write to
+   */
   def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
index 557b419..3b4a2e9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -19,34 +19,33 @@ package org.apache.mahout.math.indexeddataset
 
 import scala.collection.mutable.HashMap
 
-/** Syntactic sugar for mutable.HashMap[String, Any]
-  *
-  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
-  */
+/**
+ * Syntactic sugar for mutable.HashMap[String, Any]
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
 class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
   // note: this require a mutable HashMap, do we care?
   this ++= params
 
-  /** Constructor for copying an existing Schema
-    *
-    * @param schemaToClone return a copy of this Schema
-    */
+  /**
+   * Constructor for copying an existing Schema
+   * @param schemaToClone return a copy of this Schema
+   */
   def this(schemaToClone: Schema){
     this()
     this ++= schemaToClone
   }
 }
 
-/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
-  * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
-  * , which can be used to create a Mahout DRM for DSL ops.
-  */
-
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+// which can be used to create a Mahout DRM for DSL ops.
 
-/** Simple default Schema for typical text delimited element file input
-  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
-  * <comma, tab, or space>here may be other ignored text...)
-  */
+/**
+ * Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
 final object DefaultIndexedDatasetElementReadSchema extends Schema(
   "delim" -> "[,\t ]", //comma, tab or space
   "filter" -> "",
@@ -54,46 +53,49 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema(
   "columnIDPosition" -> 1,
   "filterColumn" -> -1)
 
-/** Default Schema for text delimited drm file output
-  * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
-  */
+/**
+ * Default Schema for text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output with
+ * one row per line.
+ * The default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
 final object DefaultIndexedDatasetWriteSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
   "omitScore" -> false)
 
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
-  * This tells the reader to input text lines of the form:
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
-  */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file
+ * row-wise input. This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
 final object DefaultIndexedDatasetReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ")
 
-/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
-  * the score of any element is ignored,
-  * all non-zeros are replaced with 1.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
-  * Alternatively the format can be
-  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
-  * output format for [[IndexedDatasetWriteBooleanSchema]]
-  */
+/**
+ * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
+ * the score of any element is ignored.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
 final object IndexedDatasetReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
   "omitScore" -> true)
 
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
-  * the score of a element is omitted.
-  * The presence of a element means the score = 1, the absence means a score of 0.
-  * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
-  * (rowID<tab>columnID1<space>columnID2...)
-  */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output
+ * where the score of a element is omitted. This tells the writer to output
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
 final object IndexedDatasetWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index bcf9e30..c7069b6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -164,14 +164,14 @@
         <artifactId>maven-assembly-plugin</artifactId>
         <executions>
           <execution>
-            <id>job</id>
+            <id>dependency-reduced</id>
             <phase>package</phase>
             <goals>
               <goal>single</goal>
             </goals>
             <configuration>
               <descriptors>
-                <descriptor>../spark/src/main/assembly/job.xml</descriptor>
+                <descriptor>src/main/assembly/dependency-reduced.xml</descriptor>
               </descriptors>
             </configuration>
           </execution>
@@ -317,11 +317,9 @@
       <scope>test</scope>
     </dependency>
 
-
     <!--  3rd-party -->
 
     <!-- scala stuff -->
-
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.major}</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml
new file mode 100644
index 0000000..5dcc945
--- /dev/null
+++ b/spark/src/main/assembly/dependency-reduced.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+  http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>dependency-reduced</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <unpack>true</unpack>
+      <unpackOptions>
+      <!-- MAHOUT-1126 -->
+      <excludes>
+         <exclude>META-INF/LICENSE</exclude>
+      </excludes>
+      </unpackOptions>
+      <scope>runtime</scope>
+      <outputDirectory>/</outputDirectory>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <includes>
+        <include>com.google.guava:guava</include>
+        <include>com.github.scopt</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
deleted file mode 100644
index 2bdb3ce..0000000
--- a/spark/src/main/assembly/job.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly
-  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
-    http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-  <id>job</id>
-  <formats>
-   <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <dependencySets>
-    <dependencySet>
-      <unpack>true</unpack>
-      <unpackOptions>
-        <!-- MAHOUT-1126 -->
-        <excludes>
-          <exclude>META-INF/LICENSE</exclude>
-        </excludes>
-      </unpackOptions>
-      <scope>runtime</scope>
-      <outputDirectory>/</outputDirectory>
-      <useTransitiveFiltering>true</useTransitiveFiltering>
-      <excludes>
-        <exclude>org.apache.hadoop:hadoop-core</exclude>
-      </excludes>
-    </dependencySet>
-  </dependencySets>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/classes</directory>
-      <outputDirectory>/</outputDirectory>
-      <excludes>
-        <exclude>*.jar</exclude>
-      </excludes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/classes</directory>
-      <outputDirectory>/</outputDirectory>
-      <includes>
-        <include>driver.classes.default.props</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
index 42bf697..0b4130d 100644
--- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 /**
  * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
  * in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
  * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
  * @param filePattern regex that must match the entire filename to have the file returned
  * @param recursive true traverses the filesystem recursively, default = false
@@ -35,8 +34,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
   val conf = new Configuration()
   val fs = FileSystem.get(conf)
 
-  /** Returns a string of comma delimited URIs matching the filePattern
-    * When pattern matching dirs are never returned, only traversed. */
+  /**
+   * Returns a string of comma delimited URIs matching the filePattern
+   * When pattern matching dirs are never returned, only traversed.
+   */
   def uris: String = {
     if (!filePattern.isEmpty){ // have file pattern so
     val pathURIs = pathURI.split(",")
@@ -51,8 +52,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
     }
   }
 
-  /** Find matching files in the dir, recursively call self when another directory is found
-    * Only files are matched, directories are traversed but never return a match */
+  /**
+   * Find matching files in the dir, recursively call self when another directory is found
+   * Only files are matched, directories are traversed but never return a match
+   */
   private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
     val seed = fs.getFileStatus(new Path(dir))
     var f: String = files
@@ -71,7 +74,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
           f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
       }
-    }else{ f = dir }// was a filename not dir
+    } else { f = dir }// was a filename not dir
     f
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 36ba6ef..63da80f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,23 +24,19 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
- * Reads text lines
- * that contain (row id, column id, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
- * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
- * matrices and calculate both the self similarity of the primary matrix and the row-wise
- * similarity of the primary
- * to the secondary. Returns one or two directories of text files formatted as specified in
- * the options.
- * The options allow flexible control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
- * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
- * you can specify only the input and output file and directory--all else will default to the correct values.
- * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the output.
+ * The individual elements will be accumulated into a matrix like
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] and
+ * [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] will be used to calculate row-wise
+ * self-similarity, or when using filters or two inputs, will generate two matrices and calculate both the
+ * self-similarity of the primary matrix and the row-wise similarity of the primary to the secondary. Returns one
+ * or two directories of text files formatted as specified in the options. The options allow flexible control of the
+ * input schema, file discovery, output schema, and control of algorithm parameters. To get help run
+ * {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple elements of text delimited
+ * values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, you can specify
+ * only the input and output file and directory--all else will default to the correct values. Each output line will
+ * contain the Item ID and similar items sorted by LLR strength descending.
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
  *       option.
@@ -57,6 +53,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
   private var readSchema2: Schema = _
 
   /**
+   * Entry point, not using Scala App trait
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
@@ -74,52 +71,51 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
         options + ("maxPrefs" -> x)
       } text ("Max number of preferences to consider per user (optional). Default: " +
         ItemSimilarityOptions("maxPrefs")) validate { x =>
-        if (x > 0) success else failure("Option --maxPrefs must be > 0")
+          if (x > 0) success else failure("Option --maxPrefs must be > 0")
       }
 
-      /** not implemented in SimilarityAnalysis.cooccurrence
-        * threshold, and minPrefs
-        * todo: replacing the threshold with some % of the best values and/or a
-        * confidence measure expressed in standard deviations would be nice.
-        */
+      // not implemented in SimilarityAnalysis.cooccurrence
+      // threshold, and minPrefs
+      // todo: replacing the threshold with some % of the best values and/or a
+      // confidence measure expressed in standard deviations would be nice.
 
       opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
         options + ("maxSimilaritiesPerItem" -> x)
       } text ("Limit the number of similarities per item to this number (optional). Default: " +
         ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
-        if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+          if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
       }
 
       //Driver notes--driver specific
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
       //Input text format
-      parseElementInputSchemaOptions
+      parseElementInputSchemaOptions()
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
       //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
   override protected def start() : Unit = {
 
-    super.start
+    super.start()
 
     readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
       "filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -139,12 +135,12 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
       "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-    }
+  }
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
-      parser.opts("recursive").asInstanceOf[Boolean]).uris
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+      parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
     val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
     else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
@@ -160,8 +156,8 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
 
       // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
       // Here we assume there is one row ID space for all interactions. To do this we calculate the
-      // row cardinality only after reading in A and B (or potentially C...) We then adjust the
-      // cardinality so all match, which is required for the math to work.
+      // row cardinality only after reading in A and B (or potentially C...) We then adjust the cardinality
+      // so all match, which is required for the math to work.
       // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
       // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
       val datasetB = if (!inFiles2.isEmpty) {
@@ -201,19 +197,19 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def process: Unit = {
-    start
+  override def process(): Unit = {
+    start()
 
     val indexedDatasets = readIndexedDatasets
     val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
       parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
 
     // todo: allow more than one cross-similarity matrix?
-    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "similarity-matrix", schema = writeSchema)
     if(idss.length > 1)
-      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
+      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-similarity-matrix", schema = writeSchema)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index ab40c3a..668d70c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -21,69 +21,81 @@ import org.apache.mahout.math.drm.DistributedContext
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a Map of options for the command line parser. The following template may help:
-  * {{{
-  * object SomeDriver extends MahoutDriver {
-  *
-  *   // define only the options specific to this driver, inherit the generic ones
-  *   private final val SomeOptions = HashMap[String, Any](
-  *       "maxThings" -> 500,
-  *       "minThings" -> 100,
-  *       "appName" -> "SomeDriver")
-  *
-  *   override def main(args: Array[String]): Unit = {
-  *
-  *     val parser = new MahoutOptionParser(programName = "shortname") {
-  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
-  *
-  *       // Input output options, non-driver specific
-  *       parseIOOptions
-  *
-  *       // Algorithm specific options
-  *       // Add in the new options
-  *       opts = opts ++ SomeOptions
-  *       note("\nAlgorithm control options:")
-  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
-  *         options + ("maxThings" -> x) ...
-  *     }
-  *     parser.parse(args, parser.opts) map { opts =>
-  *       parser.opts = opts
-  *       process
-  *     }
-  *   }
-  *
-  *   override def process: Unit = {
-  *     start // override to change the default Kryo or SparkConf before the distributed context is created
-  *     // do the work here
-  *     stop
-  *   }
-  *
-  * }}}
-  */
+/**
+ * Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ *   // define only the options specific to this driver, inherit the generic ones
+ *   private final val SomeOptions = HashMap[String, Any](
+ *       "maxThings" -> 500,
+ *       "minThings" -> 100,
+ *       "appName" -> "SomeDriver")
+ *
+ *   override def main(args: Array[String]): Unit = {
+ *
+ *     val parser = new MahoutOptionParser(programName = "shortname") {
+ *       head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ *       // Input output options, non-driver specific
+ *       parseIOOptions()
+ *
+ *       // Algorithm specific options
+ *       // Add in the new options
+ *       opts = opts ++ SomeOptions
+ *       note("\nAlgorithm control options:")
+ *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ *         options + ("maxThings" -> x) ...
+ *     }
+ *     parser.parse(args, parser.opts) map { opts =>
+ *       parser.opts = opts
+ *       process()
+ *     }
+ *   }
+ *
+ *   override def process: Unit = {
+ *     start() // override to change the default Kryo or SparkConf before the distributed context is created
+ *     // do the work here
+ *     stop()
+ *   }
+ *
+ * }}}
+ */
 abstract class MahoutSparkDriver extends MahoutDriver {
 
 
-  implicit protected var sparkConf = new SparkConf()
+  implicit var sparkConf = new SparkConf()
 
-  /** Creates a Spark context to run the job inside.
-    * Override to set the SparkConf values specific to the job,
-    * these must be set before the context is created.
-    * */
-  protected def start() : Unit = {
+  /**
+   * Creates a Spark context to run the job inside.
+   * Override to set the SparkConf values specific to the job,
+   * these must be set before the context is created.
+   */
+  override protected def start() : Unit = {
     if (!_useExistingContext) {
+      /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work
+      sparkConf.set("spark.files.userClassPathFirst", "true")
+      sparkConf.set("spark.executor.userClassPathFirst", "true")
+      */
+
       sparkConf.set("spark.kryo.referenceTracking", "false")
         .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
 
       if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
         sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
       //else leave as set in Spark config
-      mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+      mc = mahoutSparkContext(
+        masterUrl = parser.opts("master").asInstanceOf[String],
         appName = parser.opts("appName").asInstanceOf[String],
         sparkConf = sparkConf)
     }
   }
 
+  /**
+   * Call this before start to use an existing context as when running multiple drivers from a scalatest suite.
+   * @param context An already set up context to run against
+   */
   def useContext(context: DistributedContext): Unit = {
     _useExistingContext = true
     mc = context

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index a46d2ee..b3a1ec2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -18,26 +18,30 @@ package org.apache.mahout.drivers
 
 import org.apache.spark.SparkConf
 
+/** Adds parsing of Spark specific options to the option parser */
 class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
 
-  def parseSparkOptions(implicit sparkConf: SparkConf) = {
+  def parseSparkOptions()(implicit sparkConf: SparkConf) = {
     opts = opts ++ MahoutOptionParser.SparkOptions
     opts = opts + ("appName" -> programName)
     note("\nSpark config options:")
 
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-      options + ("master" -> x)
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
+      "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+        options + ("master" -> x)
     }
 
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
-      options + ("sparkExecutorMem" -> x)
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " +
+      "node (optional). Default: as Spark config specifies") action { (x, options) =>
+        options + ("sparkExecutorMem" -> x)
     }
 
     opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
       sparkConf.set(k, v)
     } validate { x =>
       if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
-    } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+    } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before " +
+      "creating this job's Spark context (optional)")
 
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 8c1bce4..3b47452 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -26,12 +26,11 @@ import scala.collection.immutable.HashMap
 /**
  * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
  * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * with domain specific IDS of the form
- * (row id, column id: strength, ...). The IDs will be preserved in the
+ * with domain specific IDS of the form (row id, column id: strength, ...). The IDs will be preserved in the
  * output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
- * will be used to calculate row-wise similarity using log-likelihood
- * The options allow control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
+ * will be used to calculate row-wise similarity using log-likelihood. The options allow control of the input
+ * schema, file discovery, output schema, and control of algorithm parameters.
+ *
  * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
  * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
  * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
@@ -49,6 +48,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
   private var readWriteSchema: Schema = _
 
   /**
+   * Entry point, not using Scala App trait
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
@@ -67,35 +67,34 @@ object RowSimilarityDriver extends MahoutSparkDriver {
         options + ("maxObservations" -> x)
       } text ("Max number of observations to consider per row (optional). Default: " +
         RowSimilarityOptions("maxObservations")) validate { x =>
-        if (x > 0) success else failure("Option --maxObservations must be > 0")
+          if (x > 0) success else failure("Option --maxObservations must be > 0")
       }
 
       opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
         options + ("maxSimilaritiesPerRow" -> x)
       } text ("Limit the number of similarities per item to this number (optional). Default: " +
         RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
-        if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+          if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
       }
 
-      /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
-        * todo: replacing the threshold with some % of the best values and/or a
-        * confidence measure expressed in standard deviations would be nice.
-        */
+      // --threshold not implemented in SimilarityAnalysis.rowSimilarity
+      // todo: replacing the threshold with some % of the best values and/or a
+      // confidence measure expressed in standard deviations would be nice.
 
       //Driver notes--driver specific
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
       //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      parseIndexedDatasetFormatOptions()
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
@@ -106,9 +105,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override protected def start() : Unit = {
+  override protected def start(): Unit = {
 
-    super.start
+    super.start()
 
     readWriteSchema = new Schema(
       "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -120,8 +119,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
 
   private def readIndexedDataset: IndexedDataset = {
 
-    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
-      parser.opts("recursive").asInstanceOf[Boolean]).uris
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+      parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       null.asInstanceOf[IndexedDataset]
@@ -132,8 +131,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def process: Unit = {
-    start
+  override def process(): Unit = {
+    start()
 
     val indexedDataset = readIndexedDataset
 
@@ -144,7 +143,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
 
     rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 368ee89..8531a0a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -58,56 +58,56 @@ object TestNBDriver extends MahoutSparkDriver {
 
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
-      //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      //IndexedDataset output schema--not driver specific, IndexedDataset specific
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
-/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-private def readTestSet: DrmLike[_] = {
-  val inputPath = parser.opts("input").asInstanceOf[String]
-  val trainingSet= drm.drmDfsRead(inputPath)
-  trainingSet
-}
+  /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+  private def readTestSet: DrmLike[_] = {
+    val inputPath = parser.opts("input").asInstanceOf[String]
+    val trainingSet = drm.drmDfsRead(inputPath)
+    trainingSet
+  }
 
-/** read the model from pathToModel using NBModel.DfsRead(...) */
-private def readModel: NBModel = {
-  val inputPath = parser.opts("pathToModel").asInstanceOf[String]
-  val model= NBModel.dfsRead(inputPath)
-  model
-}
+  /** read the model from pathToModel using NBModel.DfsRead(...) */
+  private def readModel: NBModel = {
+    val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+    val model = NBModel.dfsRead(inputPath)
+    model
+  }
 
-override def process: Unit = {
-  start()
+  override def process(): Unit = {
+    start()
 
-  val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
-  val outputPath = parser.opts("output").asInstanceOf[String]
+    val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+    val outputPath = parser.opts("output").asInstanceOf[String]
 
-  // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
+    // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
 
-  val testSet = readTestSet
-  val model = readModel
-  val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+    val testSet = readTestSet
+    val model = readModel
+    val analyzer = NaiveBayes.test(model, testSet, testComplementary)
 
-  println(analyzer)
+    println(analyzer)
 
-  stop
-}
+    stop()
+  }
 
 }
 


[7/7] mahout git commit: NOJIRA removed o.a.m.Pair, cleaned up style, incorporated most of PR #74 but left Spark version at 1.1.1 due to bug in 1.2.1

Posted by pa...@apache.org.
NOJIRA removed o.a.m.Pair, cleaned up style, incorporated most of PR #74 but left Spark version at 1.1.1 due to bug in 1.2.1


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b0ee8e26
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b0ee8e26
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b0ee8e26

Branch: refs/heads/master
Commit: b0ee8e2657a900315140d21205416338cc955864
Parents: 15ee195
Author: pferrel <pa...@occamsmachete.com>
Authored: Wed Mar 4 13:52:04 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Wed Mar 4 13:52:04 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b0ee8e26/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index d8d04e2..380f4eb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -134,7 +134,9 @@ object RLikeDrmOps {
 
   implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
 
-  implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
+  // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files
+  // Not sure why.
+  // implicit def cp2cpops[K: ClassTag](cp: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps(cp)
 
   /**
    * This is probably dangerous since it triggers implicit checkpointing with default storage level


[3/7] mahout git commit: scopt moved from spark to math-scala where base engine agnostic driver lives

Posted by pa...@apache.org.
scopt moved from spark to math-scala where base engine agnostic driver lives


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/46398927
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/46398927
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/46398927

Branch: refs/heads/master
Commit: 463989272b4219716ca692cb4f9b757076d5c690
Parents: e005b28 901ef03
Author: pferrel <pa...@occamsmachete.com>
Authored: Sat Feb 21 08:45:39 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Sat Feb 21 08:45:39 2015 -0800

----------------------------------------------------------------------
 h2o/pom.xml                                                  | 8 ++++----
 math-scala/pom.xml                                           | 4 ++--
 .../main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala  | 2 --
 pom.xml                                                      | 2 +-
 spark-shell/pom.xml                                          | 4 ++--
 spark/pom.xml                                                | 2 +-
 .../mahout/sparkbindings/test/DistributedSparkSuite.scala    | 5 +++++
 7 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/46398927/spark/pom.xml
----------------------------------------------------------------------


[4/7] mahout git commit: removed scopt from spark/pom.xml

Posted by pa...@apache.org.
removed scopt from spark/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/43bea68f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/43bea68f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/43bea68f

Branch: refs/heads/master
Commit: 43bea68f363143d87f567eedff147460db5bd146
Parents: 4639892 fde08a9
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:29:10 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:29:10 2015 -0800

----------------------------------------------------------------------
 h2o/pom.xml                                                  | 8 ++++----
 math-scala/pom.xml                                           | 4 ++--
 .../main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala  | 2 ++
 pom.xml                                                      | 2 +-
 spark-shell/pom.xml                                          | 4 ++--
 spark/pom.xml                                                | 2 +-
 .../mahout/sparkbindings/test/DistributedSparkSuite.scala    | 5 -----
 7 files changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/43bea68f/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index c5a1141,ca66522..4f5fe88
--- a/pom.xml
+++ b/pom.xml
@@@ -111,7 -111,7 +111,7 @@@
      <slf4j.version>1.7.5</slf4j.version>
      <scala.major>2.10</scala.major>
      <scala.version>2.10.4</scala.version>
-     <spark.version>1.2.1</spark.version>
 -    <spark.version>1.1.0</spark.version>
++    <spark.version>1.1.1</spark.version>
      <h2o.version>0.1.16</h2o.version>
    </properties>
    <issueManagement>

http://git-wip-us.apache.org/repos/asf/mahout/blob/43bea68f/spark/pom.xml
----------------------------------------------------------------------