You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/03/04 22:52:32 UTC

[6/7] mahout git commit: removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1

removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/15ee1951
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/15ee1951
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/15ee1951

Branch: refs/heads/master
Commit: 15ee1951d1ba8d8ee7d24e2510af187fd984c8cf
Parents: 43bea68
Author: pferrel <pa...@occamsmachete.com>
Authored: Mon Mar 2 13:38:05 2015 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Mon Mar 2 13:38:05 2015 -0800

----------------------------------------------------------------------
 examples/bin/get-all-examples.sh                |   4 +-
 math-scala/pom.xml                              |  25 +++-
 .../apache/mahout/drivers/MahoutDriver.scala    |  11 +-
 .../mahout/drivers/MahoutOptionParser.scala     |  78 ++++++-----
 .../mahout/math/cf/SimilarityAnalysis.scala     | 104 +++++++--------
 .../org/apache/mahout/math/drm/package.scala    |   4 +-
 .../math/indexeddataset/IndexedDataset.scala    |  23 ++--
 .../math/indexeddataset/ReaderWriter.scala      |  71 ++++++++--
 .../mahout/math/indexeddataset/Schema.scala     |  82 ++++++------
 spark/pom.xml                                   |   6 +-
 spark/src/main/assembly/dependency-reduced.xml  |  44 +++++++
 spark/src/main/assembly/job.xml                 |  61 ---------
 .../apache/mahout/common/HDFSPathSearch.scala   |  17 ++-
 .../mahout/drivers/ItemSimilarityDriver.scala   |  78 ++++++-----
 .../mahout/drivers/MahoutSparkDriver.scala      | 106 ++++++++-------
 .../drivers/MahoutSparkOptionParser.scala       |  16 ++-
 .../mahout/drivers/RowSimilarityDriver.scala    |  43 +++---
 .../apache/mahout/drivers/TestNBDriver.scala    |  58 ++++----
 .../drivers/TextDelimitedReaderWriter.scala     | 132 ++++++++++---------
 .../apache/mahout/drivers/TrainNBDriver.scala   |  16 +--
 .../mahout/sparkbindings/SparkEngine.scala      |  10 +-
 .../indexeddataset/IndexedDatasetSpark.scala    |  17 ++-
 .../apache/mahout/sparkbindings/package.scala   |   6 +-
 .../drivers/ItemSimilarityDriverSuite.scala     | 117 ++++++++--------
 .../test/DistributedSparkSuite.scala            |   4 +
 25 files changed, 619 insertions(+), 514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/examples/bin/get-all-examples.sh
----------------------------------------------------------------------
diff --git a/examples/bin/get-all-examples.sh b/examples/bin/get-all-examples.sh
index a24c7fd..4128e47 100755
--- a/examples/bin/get-all-examples.sh
+++ b/examples/bin/get-all-examples.sh
@@ -26,8 +26,8 @@ echo " Solr-recommender example: "
 echo " 1) imports text 'log files' of some delimited form for user preferences"
 echo " 2) creates the correct Mahout files and stores distionaries to translate external Id to and from Mahout Ids"
 echo " 3) it implements a prototype two actions 'cross-recommender', which takes two actions made by the same user and creates recommendations"
-echo " 4) it creates output for user->preference history CSV and and item->similar items 'indicator' matrix for use in a Solr-recommender."
-echo "    To use Solr you would index the indicator matrix CSV, and use user preference history from the history CSV as a query, the result"
+echo " 4) it creates output for user->preference history CSV and and item->similar items 'similarity' matrix for use in a Solr-recommender."
+echo "    To use Solr you would index the similarity matrix CSV, and use user preference history from the history CSV as a query, the result"
 echo "    from Solr will be an ordered list of recommendations returning the same item Ids as were input."
 echo " For further description see the README.md here https://github.com/pferrel/solr-recommender"
 echo " To build run 'cd solr-recommender; mvn install'"

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 66309d6..50cea7a 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -175,17 +175,36 @@
       
     <dependency>
       <groupId>com.github.scopt</groupId>
-      <artifactId>scopt_2.10</artifactId>
-      <version>3.2.0</version>
+      <artifactId>scopt_${scala.major}</artifactId>
+      <version>3.3.0</version>
     </dependency>
 
     <!-- scala stuff -->
     <dependency>
       <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-actors</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scalap</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.major}</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 3d9d4e1..32515f1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -19,23 +19,24 @@ package org.apache.mahout.drivers
 
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Extended by a platform specific version of this class to create a Mahout CLI driver.
- */
+/** Extended by a platform specific version of this class to create a Mahout CLI driver. */
 abstract class MahoutDriver {
 
-
   implicit protected var mc: DistributedContext = _
   implicit protected var parser: MahoutOptionParser = _
 
   var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
 
+  /** must be overriden to setup the DistributedContext mc*/
+  protected def start() : Unit
+
   /** Override (optionally) for special cleanup */
-  protected def stop: Unit = {
+  protected def stop(): Unit = {
     if (!_useExistingContext) mc.close
   }
 
   /** This is where you do the work, call start first, then before exiting call stop */
-  protected def process: Unit
+  protected def process(): Unit
 
   /** Parse command line and call process */
   def main(args: Array[String]): Unit

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 479a8d0..3b5affd 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -20,16 +20,17 @@ import scopt.OptionParser
 
 import scala.collection.immutable
 
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
-  * keep both standarized.
-  * @param programName Name displayed in help message, the name by which the driver is invoked.
-  * @note options are engine neutral by convention. See the engine specific extending class for
-  *       to add Spark or other engine options.
-  * */
+/**
+ * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ *       to add Spark or other engine options.
+ */
 class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
 
   // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so they can override and previous options!
+  // Note: always put the driver specific options at the last so they can override any previous options!
   var opts = Map.empty[String, Any]
 
   override def showUsageOnError = true
@@ -39,12 +40,14 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     note("Input, output options")
     opt[String]('i', "input") required() action { (x, options) =>
       options + ("input" -> x)
-    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" +
+      " (required)")
 
     if (numInputs == 2) {
       opt[String]("input2") abbr ("i2") action { (x, options) =>
         options + ("input2" -> x)
-      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " +
+        "(optional). Default: empty.")
     }
 
     opt[String]('o', "output") required() action { (x, options) =>
@@ -53,11 +56,11 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       } else {
         options + ("output" -> (x + "/"))
       }
-    } text ("Path for output, any local or HDFS supported URI (required)")
+    } text ("Path for output directory, any HDFS supported URI (required)")
 
   }
 
-  def parseGenericOptions = {
+  def parseGenericOptions() = {
     opts = opts ++ MahoutOptionParser.GenericOptions
     opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
       options + ("randomSeed" -> x)
@@ -65,48 +68,55 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       if (x > 0) success else failure("Option --randomSeed must be > 0")
     }
 
-    //output both input DRMs
+    //output both input IndexedDatasets
     opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
       options + ("writeAllDatasets" -> true)
     }//Hidden option, though a user might want this.
   }
 
-  def parseElementInputSchemaOptions{
+  def parseElementInputSchemaOptions() = {
     //Input text file schema--not driver specific but input data specific, elements input,
-    // not drms
+    // not rows of IndexedDatasets
     opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
     note("\nInput text file schema options:")
-    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
-      options + ("inDelim" -> x)
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action {
+      (x, options) =>
+        options + ("inDelim" -> x)
     }
 
     opt[String]("filter1") abbr ("f1") action { (x, options) =>
       options + ("filter1" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " +
+      "Default: no filter, all data is used")
 
     opt[String]("filter2") abbr ("f2") action { (x, options) =>
       options + ("filter2" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " +
+      "If not present no secondary dataset is collected")
 
     opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
       options + ("rowIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate {
+      x =>
+        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
     }
 
     opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
       options + ("itemIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate {
+      x =>
+        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
     }
 
     opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
       options + ("filterColumn" -> x)
-    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " +
+      "filter") validate { x =>
       if (x >= -1) success else failure("Option --filterColNum must be >= -1")
     }
 
-    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" +
+      " \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
 
     //check for column consistency
     checkConfig { options: Map[String, Any] =>
@@ -126,7 +136,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
   }
 
-  def parseFileDiscoveryOptions = {
+  def parseFileDiscoveryOptions() = {
     //File finding strategy--not driver specific
     opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
     note("\nFile discovery options:")
@@ -136,12 +146,13 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
     opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
       options + ("filenamePattern" -> x)
-    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option " +
+      "or \"^part-.*\" if --input is a directory")
 
   }
 
-  def parseDrmFormatOptions = {
-    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+  def parseIndexedDatasetFormatOptions() = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
     note("\nOutput text file schema options:")
     opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
       options + ("rowKeyDelim" -> x)
@@ -160,13 +171,16 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     } text ("Do not write the strength to the output files (optional), Default: false.")
     note("This option is used to output indexable data for creating a search engine recommender.")
 
-    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+    note("\nDefault delimiters will produce output of the form: " +
+      "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
   }
 
 }
 
-/** Companion object defines default option groups for reference in any driver that needs them.
-  * @note not all options are platform neutral so other platforms can add default options here if desired */
+/**
+ * Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired
+ */
 object MahoutOptionParser {
 
   // set up the various default option groups
@@ -196,7 +210,7 @@ object MahoutOptionParser {
     "filter2" -> null.asInstanceOf[String],
     "inDelim" -> "[,\t ]")
 
-  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+  final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any](
     "rowKeyDelim" -> "\t",
     "columnIdStrengthDelim" -> ":",
     "elementDelim" -> " ",

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index e1766e8..6557ab0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -32,7 +32,7 @@ import scala.util.Random
 
 
 /**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
  * available at http://www.mapr.com/practical-machine-learning
  *
  * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
@@ -44,14 +44,16 @@ object SimilarityAnalysis extends Serializable {
   /** Compares (Int,Double) pairs by the second value */
   private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
 
-  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
-    * and returns a list of indicator and cross-indicator matrices
-    * @param drmARaw Primary interaction matrix
-    * @param randomSeed when kept to a constant will make repeatable downsampling
-    * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
-    * @param maxNumInteractions max number of interactions after downsampling, default: 500
-    * @return
-    */
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+   * and returns a list of similarity and cross-similarity matrices
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
+   *         cross-cooccurrence
+   */
   def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
                     maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
 
@@ -69,11 +71,11 @@ object SimilarityAnalysis extends Serializable {
     // Compute co-occurrence matrix A'A
     val drmAtA = drmA.t %*% drmA
 
-    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
-    val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+    val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
 
-    var indicatorMatrices = List(drmIndicatorsAtA)
+    var similarityMatrices = List(drmSimilarityAtA)
 
     // Now look at cross-co-occurrences
     for (drmBRaw <- drmBs) {
@@ -86,10 +88,10 @@ object SimilarityAnalysis extends Serializable {
       // Compute cross-co-occurrence matrix A'B
       val drmAtB = drmA.t %*% drmB
 
-      val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+      val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
         bcastInteractionsPerItemA, bcastInteractionsPerThingB)
 
-      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+      similarityMatrices = similarityMatrices :+ drmSimilarityAtB
 
       drmB.uncache()
     }
@@ -97,19 +99,21 @@ object SimilarityAnalysis extends Serializable {
     // Unpin downsampled interaction matrix
     drmA.uncache()
 
-    // Return list of indicator matrices
-    indicatorMatrices
+    // Return list of similarity matrices
+    similarityMatrices
   }
 
-  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
-    * and returns a list of indicator and cross-indicator matrices
-    * Somewhat easier to use method, which handles the ID dictionaries correctly
-    * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
-    * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
-    * @param maxInterestingItemsPerThing max similarities per items
-    * @param maxNumInteractions max number of input items per item
-    * @return
-    */
+  /**
+   * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
+   * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
+   * dictionaries correctly
+   * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+   * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingItemsPerThing max similarities per items
+   * @param maxNumInteractions max number of input items per item
+   * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
+   *         IndexedDatasets for cooccurrence and cross-cooccurrence
+   */
   def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
       randomSeed: Int = 0xdeadbeef,
       maxInterestingItemsPerThing: Int = 50,
@@ -127,13 +131,13 @@ object SimilarityAnalysis extends Serializable {
     retIDSs.toList
   }
 
-  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
-    * @param drmARaw Primary interaction matrix
-    * @param randomSeed when kept to a constant will make repeatable downsampling
-    * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
-    * @param maxNumInteractions max number of interactions after downsampling, default: 500
-    * @return
-    */
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
+   * @param drmARaw Primary interaction matrix
+   * @param randomSeed when kept to a constant will make repeatable downsampling
+   * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+   * @param maxNumInteractions max number of interactions after downsampling, default: 500
+   */
   def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
                     maxNumInteractions: Int = 500): DrmLike[Int] = {
 
@@ -152,20 +156,20 @@ object SimilarityAnalysis extends Serializable {
     val drmAAt = drmA %*% drmA.t
 
     // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
-    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
+    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
+      bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
 
     drmSimilaritiesAAt
   }
 
-  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
-    * Uses IndexedDatasets, which handle external ID dictionaries properly
-    * @param indexedDataset compare each row to every other
-    * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
-    * @param maxInterestingSimilaritiesPerRow max elements returned in each row
-    * @param maxObservationsPerRow max number of input elements to use
-    * @return
-    */
+  /**
+   * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+   * Uses IndexedDatasets, which handle external ID dictionaries properly
+   * @param indexedDataset compare each row to every other
+   * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
+   * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+   * @param maxObservationsPerRow max number of input elements to use
+   */
   def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
       maxInterestingSimilaritiesPerRow: Int = 50,
       maxObservationsPerRow: Int = 500):
@@ -175,10 +179,7 @@ object SimilarityAnalysis extends Serializable {
     indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
   }
 
-   /**
-     * Compute loglikelihood ratio
-     * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-     */
+   /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
   def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
     numInteractionsWithAandB: Long, numInteractions: Long) = {
 
@@ -220,7 +221,7 @@ object SimilarityAnalysis extends Serializable {
 
               val candidate = thingA -> llr
 
-              // matches legacy hadoop code and maps values to range (0..1)
+              // legacy hadoop code maps values to range (0..1) via
               // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
               // val candidate = thingA -> normailizedLLR
 
@@ -253,7 +254,7 @@ object SimilarityAnalysis extends Serializable {
    * @param drmM matrix to downsample
    * @param seed random number generator seed, keep to a constant if repeatability is neccessary
    * @param maxNumInteractions number of elements in a row of the returned matrix
-   * @return
+   * @return the downsampled DRM
    */
   def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
 
@@ -269,9 +270,8 @@ object SimilarityAnalysis extends Serializable {
       case (keys, block) =>
         val numInteractions: Vector = bcastNumInteractions
 
-        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
-        // don't use commons since scala's is included anyway
-        // val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
+        //failures
         val random = new Random(MurmurHash.hash(keys(0), seed))
 
         val downsampledBlock = block.like()

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 3afbecb..81f6ab1 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -123,13 +123,13 @@ package object indexeddataset {
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create())
-      (implicit ctx: DistributedContext):
+    (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
 
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create())
-      (implicit ctx: DistributedContext):
+    (implicit ctx: DistributedContext):
     IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
index c7eb2cb..f6811e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -22,15 +22,12 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.math.indexeddataset
 
 /**
-  * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
-  * ID/label translation dictionaries.
-  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
-  * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
-  * used internal to Mahout core code.
-  *
-  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
-  *       to be not created when not needed.
-  */
+ * Wrap an  [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]]
+ * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout
+ * core code.
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created
+ *       when not needed.
+ */
 
 trait IndexedDataset {
   val matrix: CheckpointedDrm[Int]
@@ -39,12 +36,13 @@ trait IndexedDataset {
 
   /**
    * Write a text delimited file(s) with the row and column IDs from dictionaries.
-   * @param dest
-   * @param schema
+   * @param dest write location, usually a directory
+   * @param schema params to control writing
+   * @param sc the [[org.apache.mahout.math.drm.DistributedContext]] used to do a distributed write
    */
   def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
 
-  /** Factory method, creates the extending class */
+  /** Factory method, creates the extending class  and returns a new instance */
   def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
     IndexedDataset
 
@@ -52,6 +50,7 @@ trait IndexedDataset {
    * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
    * No changes are made to the underlying drm.
    * @param n number to use for new row cardinality, should be larger than current
+   * @return a new IndexedDataset or extending class with new cardinality
    * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
    *       results.
    */

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
index cf429f5..f7653ae 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -20,49 +20,98 @@ package org.apache.mahout.math.indexeddataset
 import com.google.common.collect.{BiMap, HashBiMap}
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
-  * which also defines the type to be read.
-  * @tparam T type of object to read.
-  */
+/**
+ * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an
+ * extending trait, which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
 trait Reader[T]{
 
   val mc: DistributedContext
   val readSchema: Schema
 
+  /**
+   * Override in extending trait to supply T and perform a parallel read of collection elements
+   * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+   * @param readSchema map of parameters controlling formating and how the read is executed
+   * @param source list of comma delimited files to read from
+   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+   *                       been applied to this collection--used to synchronize row IDs between several
+   *                       collections
+   * @return a new collection of type T
+   */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
       existingRowIDs: BiMap[String, Int]): T
 
-  protected def drmReader(
+  /**
+   * Override in extending trait to supply T and perform a parallel read of collection rows
+   * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from
+   * @param readSchema map of parameters controlling formating and how the read is executed
+   * @param source list of comma delimited files to read from
+   * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already
+   *                       been applied to this collection--used to synchronize row IDs between several
+   *                       collections
+   * @return a new collection of type T
+   */
+  protected def rowReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
       existingRowIDs: BiMap[String, Int]): T
 
+  /**
+   * Public method called to perform the element-wise read. Usually no need to override
+   * @param source comma delimited URIs to read from
+   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+   *                       to synchronize all row ids is several collections
+   * @return a new collection of type T
+   */
   def readElementsFrom(
       source: String,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
     elementReader(mc, readSchema, source, existingRowIDs)
 
-  def readDRMFrom(
+  /**
+   * Public method called to perform the row-wise read. Usually no need to override.
+   * @param source comma delimited URIs to read from
+   * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used
+   *                       to synchronize all row ids is several collections
+   * @return  a new collection of type T
+   */
+  def readRowsFrom(
       source: String,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    drmReader(mc, readSchema, source, existingRowIDs)
+    rowReader(mc, readSchema, source, existingRowIDs)
 }
 
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
-  * which also defines the type to be written.
-  * @tparam T type of object to write.
-  */
+/**
+ * Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write, usually a matrix type thing.
+ */
 trait Writer[T]{
 
   val mc: DistributedContext
   val sort: Boolean
   val writeSchema: Schema
 
+  /**
+   * Override to provide writer method
+   * @param mc context used to do distributed write
+   * @param writeSchema map with params to control format and execution of the write
+   * @param dest root directory to write to
+   * @param collection usually a matrix like collection to write
+   * @param sort flags whether to sort the rows by value descending
+   */
   protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
 
+  /**
+   * Call this method to perform the write, usually no need to override.
+   * @param collection what to write
+   * @param dest root directory to write to
+   */
   def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
index 557b419..3b4a2e9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -19,34 +19,33 @@ package org.apache.mahout.math.indexeddataset
 
 import scala.collection.mutable.HashMap
 
-/** Syntactic sugar for mutable.HashMap[String, Any]
-  *
-  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
-  */
+/**
+ * Syntactic sugar for mutable.HashMap[String, Any]
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
 class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
   // note: this require a mutable HashMap, do we care?
   this ++= params
 
-  /** Constructor for copying an existing Schema
-    *
-    * @param schemaToClone return a copy of this Schema
-    */
+  /**
+   * Constructor for copying an existing Schema
+   * @param schemaToClone return a copy of this Schema
+   */
   def this(schemaToClone: Schema){
     this()
     this ++= schemaToClone
   }
 }
 
-/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
-  * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
-  * , which can be used to create a Mahout DRM for DSL ops.
-  */
-
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+// which can be used to create a Mahout DRM for DSL ops.
 
-/** Simple default Schema for typical text delimited element file input
-  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
-  * <comma, tab, or space>here may be other ignored text...)
-  */
+/**
+ * Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
 final object DefaultIndexedDatasetElementReadSchema extends Schema(
   "delim" -> "[,\t ]", //comma, tab or space
   "filter" -> "",
@@ -54,46 +53,49 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema(
   "columnIDPosition" -> 1,
   "filterColumn" -> -1)
 
-/** Default Schema for text delimited drm file output
-  * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
-  */
+/**
+ * Default Schema for text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output with
+ * one row per line.
+ * The default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
 final object DefaultIndexedDatasetWriteSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
   "omitScore" -> false)
 
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
-  * This tells the reader to input text lines of the form:
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
-  */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file
+ * row-wise input. This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
 final object DefaultIndexedDatasetReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ")
 
-/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
-  * the score of any element is ignored,
-  * all non-zeros are replaced with 1.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
-  * Alternatively the format can be
-  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
-  * output format for [[IndexedDatasetWriteBooleanSchema]]
-  */
+/**
+ * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
+ * the score of any element is ignored.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
 final object IndexedDatasetReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
   "elementDelim" -> " ",
   "omitScore" -> true)
 
-/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
-  * the score of a element is omitted.
-  * The presence of a element means the score = 1, the absence means a score of 0.
-  * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
-  * (rowID<tab>columnID1<space>columnID2...)
-  */
+/**
+ * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output
+ * where the score of a element is omitted. This tells the writer to output
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
 final object IndexedDatasetWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index bcf9e30..c7069b6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -164,14 +164,14 @@
         <artifactId>maven-assembly-plugin</artifactId>
         <executions>
           <execution>
-            <id>job</id>
+            <id>dependency-reduced</id>
             <phase>package</phase>
             <goals>
               <goal>single</goal>
             </goals>
             <configuration>
               <descriptors>
-                <descriptor>../spark/src/main/assembly/job.xml</descriptor>
+                <descriptor>src/main/assembly/dependency-reduced.xml</descriptor>
               </descriptors>
             </configuration>
           </execution>
@@ -317,11 +317,9 @@
       <scope>test</scope>
     </dependency>
 
-
     <!--  3rd-party -->
 
     <!-- scala stuff -->
-
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.major}</artifactId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/dependency-reduced.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml
new file mode 100644
index 0000000..5dcc945
--- /dev/null
+++ b/spark/src/main/assembly/dependency-reduced.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+  http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>dependency-reduced</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <unpack>true</unpack>
+      <unpackOptions>
+      <!-- MAHOUT-1126 -->
+      <excludes>
+         <exclude>META-INF/LICENSE</exclude>
+      </excludes>
+      </unpackOptions>
+      <scope>runtime</scope>
+      <outputDirectory>/</outputDirectory>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <includes>
+        <include>com.google.guava:guava</include>
+        <include>com.github.scopt</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
deleted file mode 100644
index 2bdb3ce..0000000
--- a/spark/src/main/assembly/job.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly
-  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
-    http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-  <id>job</id>
-  <formats>
-   <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <dependencySets>
-    <dependencySet>
-      <unpack>true</unpack>
-      <unpackOptions>
-        <!-- MAHOUT-1126 -->
-        <excludes>
-          <exclude>META-INF/LICENSE</exclude>
-        </excludes>
-      </unpackOptions>
-      <scope>runtime</scope>
-      <outputDirectory>/</outputDirectory>
-      <useTransitiveFiltering>true</useTransitiveFiltering>
-      <excludes>
-        <exclude>org.apache.hadoop:hadoop-core</exclude>
-      </excludes>
-    </dependencySet>
-  </dependencySets>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/classes</directory>
-      <outputDirectory>/</outputDirectory>
-      <excludes>
-        <exclude>*.jar</exclude>
-      </excludes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/classes</directory>
-      <outputDirectory>/</outputDirectory>
-      <includes>
-        <include>driver.classes.default.props</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
index 42bf697..0b4130d 100644
--- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 /**
  * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
  * in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
+ * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]]
  * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
  * @param filePattern regex that must match the entire filename to have the file returned
  * @param recursive true traverses the filesystem recursively, default = false
@@ -35,8 +34,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
   val conf = new Configuration()
   val fs = FileSystem.get(conf)
 
-  /** Returns a string of comma delimited URIs matching the filePattern
-    * When pattern matching dirs are never returned, only traversed. */
+  /**
+   * Returns a string of comma delimited URIs matching the filePattern
+   * When pattern matching dirs are never returned, only traversed.
+   */
   def uris: String = {
     if (!filePattern.isEmpty){ // have file pattern so
     val pathURIs = pathURI.split(",")
@@ -51,8 +52,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
     }
   }
 
-  /** Find matching files in the dir, recursively call self when another directory is found
-    * Only files are matched, directories are traversed but never return a match */
+  /**
+   * Find matching files in the dir, recursively call self when another directory is found
+   * Only files are matched, directories are traversed but never return a match
+   */
   private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
     val seed = fs.getFileStatus(new Path(dir))
     var f: String = files
@@ -71,7 +74,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive:
           f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
       }
-    }else{ f = dir }// was a filename not dir
+    } else { f = dir }// was a filename not dir
     f
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 36ba6ef..63da80f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,23 +24,19 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
- * Reads text lines
- * that contain (row id, column id, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
- * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
- * matrices and calculate both the self similarity of the primary matrix and the row-wise
- * similarity of the primary
- * to the secondary. Returns one or two directories of text files formatted as specified in
- * the options.
- * The options allow flexible control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
- * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
- * you can specify only the input and output file and directory--all else will default to the correct values.
- * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the output.
+ * The individual elements will be accumulated into a matrix like
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] and
+ * [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] will be used to calculate row-wise
+ * self-similarity, or when using filters or two inputs, will generate two matrices and calculate both the
+ * self-similarity of the primary matrix and the row-wise similarity of the primary to the secondary. Returns one
+ * or two directories of text files formatted as specified in the options. The options allow flexible control of the
+ * input schema, file discovery, output schema, and control of algorithm parameters. To get help run
+ * {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple elements of text delimited
+ * values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, you can specify
+ * only the input and output file and directory--all else will default to the correct values. Each output line will
+ * contain the Item ID and similar items sorted by LLR strength descending.
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
  *       option.
@@ -57,6 +53,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
   private var readSchema2: Schema = _
 
   /**
+   * Entry point, not using Scala App trait
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
@@ -74,52 +71,51 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
         options + ("maxPrefs" -> x)
       } text ("Max number of preferences to consider per user (optional). Default: " +
         ItemSimilarityOptions("maxPrefs")) validate { x =>
-        if (x > 0) success else failure("Option --maxPrefs must be > 0")
+          if (x > 0) success else failure("Option --maxPrefs must be > 0")
       }
 
-      /** not implemented in SimilarityAnalysis.cooccurrence
-        * threshold, and minPrefs
-        * todo: replacing the threshold with some % of the best values and/or a
-        * confidence measure expressed in standard deviations would be nice.
-        */
+      // not implemented in SimilarityAnalysis.cooccurrence
+      // threshold, and minPrefs
+      // todo: replacing the threshold with some % of the best values and/or a
+      // confidence measure expressed in standard deviations would be nice.
 
       opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
         options + ("maxSimilaritiesPerItem" -> x)
       } text ("Limit the number of similarities per item to this number (optional). Default: " +
         ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
-        if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+          if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
       }
 
       //Driver notes--driver specific
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
       //Input text format
-      parseElementInputSchemaOptions
+      parseElementInputSchemaOptions()
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
       //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
   override protected def start() : Unit = {
 
-    super.start
+    super.start()
 
     readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
       "filter" -> parser.opts("filter1").asInstanceOf[String],
@@ -139,12 +135,12 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
       "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-    }
+  }
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
-      parser.opts("recursive").asInstanceOf[Boolean]).uris
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+      parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
     val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
     else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
@@ -160,8 +156,8 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
 
       // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
       // Here we assume there is one row ID space for all interactions. To do this we calculate the
-      // row cardinality only after reading in A and B (or potentially C...) We then adjust the
-      // cardinality so all match, which is required for the math to work.
+      // row cardinality only after reading in A and B (or potentially C...) We then adjust the cardinality
+      // so all match, which is required for the math to work.
       // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
       // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
       val datasetB = if (!inFiles2.isEmpty) {
@@ -201,19 +197,19 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def process: Unit = {
-    start
+  override def process(): Unit = {
+    start()
 
     val indexedDatasets = readIndexedDatasets
     val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
       parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
 
     // todo: allow more than one cross-similarity matrix?
-    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "similarity-matrix", schema = writeSchema)
     if(idss.length > 1)
-      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
+      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-similarity-matrix", schema = writeSchema)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index ab40c3a..668d70c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -21,69 +21,81 @@ import org.apache.mahout.math.drm.DistributedContext
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a Map of options for the command line parser. The following template may help:
-  * {{{
-  * object SomeDriver extends MahoutDriver {
-  *
-  *   // define only the options specific to this driver, inherit the generic ones
-  *   private final val SomeOptions = HashMap[String, Any](
-  *       "maxThings" -> 500,
-  *       "minThings" -> 100,
-  *       "appName" -> "SomeDriver")
-  *
-  *   override def main(args: Array[String]): Unit = {
-  *
-  *     val parser = new MahoutOptionParser(programName = "shortname") {
-  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
-  *
-  *       // Input output options, non-driver specific
-  *       parseIOOptions
-  *
-  *       // Algorithm specific options
-  *       // Add in the new options
-  *       opts = opts ++ SomeOptions
-  *       note("\nAlgorithm control options:")
-  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
-  *         options + ("maxThings" -> x) ...
-  *     }
-  *     parser.parse(args, parser.opts) map { opts =>
-  *       parser.opts = opts
-  *       process
-  *     }
-  *   }
-  *
-  *   override def process: Unit = {
-  *     start // override to change the default Kryo or SparkConf before the distributed context is created
-  *     // do the work here
-  *     stop
-  *   }
-  *
-  * }}}
-  */
+/**
+ * Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ *   // define only the options specific to this driver, inherit the generic ones
+ *   private final val SomeOptions = HashMap[String, Any](
+ *       "maxThings" -> 500,
+ *       "minThings" -> 100,
+ *       "appName" -> "SomeDriver")
+ *
+ *   override def main(args: Array[String]): Unit = {
+ *
+ *     val parser = new MahoutOptionParser(programName = "shortname") {
+ *       head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ *       // Input output options, non-driver specific
+ *       parseIOOptions()
+ *
+ *       // Algorithm specific options
+ *       // Add in the new options
+ *       opts = opts ++ SomeOptions
+ *       note("\nAlgorithm control options:")
+ *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ *         options + ("maxThings" -> x) ...
+ *     }
+ *     parser.parse(args, parser.opts) map { opts =>
+ *       parser.opts = opts
+ *       process()
+ *     }
+ *   }
+ *
+ *   override def process: Unit = {
+ *     start() // override to change the default Kryo or SparkConf before the distributed context is created
+ *     // do the work here
+ *     stop()
+ *   }
+ *
+ * }}}
+ */
 abstract class MahoutSparkDriver extends MahoutDriver {
 
 
-  implicit protected var sparkConf = new SparkConf()
+  implicit var sparkConf = new SparkConf()
 
-  /** Creates a Spark context to run the job inside.
-    * Override to set the SparkConf values specific to the job,
-    * these must be set before the context is created.
-    * */
-  protected def start() : Unit = {
+  /**
+   * Creates a Spark context to run the job inside.
+   * Override to set the SparkConf values specific to the job,
+   * these must be set before the context is created.
+   */
+  override protected def start() : Unit = {
     if (!_useExistingContext) {
+      /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work
+      sparkConf.set("spark.files.userClassPathFirst", "true")
+      sparkConf.set("spark.executor.userClassPathFirst", "true")
+      */
+
       sparkConf.set("spark.kryo.referenceTracking", "false")
         .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
 
       if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
         sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
       //else leave as set in Spark config
-      mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String],
+      mc = mahoutSparkContext(
+        masterUrl = parser.opts("master").asInstanceOf[String],
         appName = parser.opts("appName").asInstanceOf[String],
         sparkConf = sparkConf)
     }
   }
 
+  /**
+   * Call this before start to use an existing context as when running multiple drivers from a scalatest suite.
+   * @param context An already set up context to run against
+   */
   def useContext(context: DistributedContext): Unit = {
     _useExistingContext = true
     mc = context

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index a46d2ee..b3a1ec2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -18,26 +18,30 @@ package org.apache.mahout.drivers
 
 import org.apache.spark.SparkConf
 
+/** Adds parsing of Spark specific options to the option parser */
 class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
 
-  def parseSparkOptions(implicit sparkConf: SparkConf) = {
+  def parseSparkOptions()(implicit sparkConf: SparkConf) = {
     opts = opts ++ MahoutOptionParser.SparkOptions
     opts = opts + ("appName" -> programName)
     note("\nSpark config options:")
 
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-      options + ("master" -> x)
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " +
+      "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+        options + ("master" -> x)
     }
 
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
-      options + ("sparkExecutorMem" -> x)
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " +
+      "node (optional). Default: as Spark config specifies") action { (x, options) =>
+        options + ("sparkExecutorMem" -> x)
     }
 
     opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
       sparkConf.set(k, v)
     } validate { x =>
       if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
-    } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+    } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before " +
+      "creating this job's Spark context (optional)")
 
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 8c1bce4..3b47452 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -26,12 +26,11 @@ import scala.collection.immutable.HashMap
 /**
  * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
  * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * with domain specific IDS of the form
- * (row id, column id: strength, ...). The IDs will be preserved in the
+ * with domain specific IDS of the form (row id, column id: strength, ...). The IDs will be preserved in the
  * output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
- * will be used to calculate row-wise similarity using log-likelihood
- * The options allow control of the input schema, file discovery, output schema, and control of
- * algorithm parameters.
+ * will be used to calculate row-wise similarity using log-likelihood. The options allow control of the input
+ * schema, file discovery, output schema, and control of algorithm parameters.
+ *
  * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
  * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
  * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
@@ -49,6 +48,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
   private var readWriteSchema: Schema = _
 
   /**
+   * Entry point, not using Scala App trait
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
@@ -67,35 +67,34 @@ object RowSimilarityDriver extends MahoutSparkDriver {
         options + ("maxObservations" -> x)
       } text ("Max number of observations to consider per row (optional). Default: " +
         RowSimilarityOptions("maxObservations")) validate { x =>
-        if (x > 0) success else failure("Option --maxObservations must be > 0")
+          if (x > 0) success else failure("Option --maxObservations must be > 0")
       }
 
       opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
         options + ("maxSimilaritiesPerRow" -> x)
       } text ("Limit the number of similarities per item to this number (optional). Default: " +
         RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
-        if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+          if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
       }
 
-      /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
-        * todo: replacing the threshold with some % of the best values and/or a
-        * confidence measure expressed in standard deviations would be nice.
-        */
+      // --threshold not implemented in SimilarityAnalysis.rowSimilarity
+      // todo: replacing the threshold with some % of the best values and/or a
+      // confidence measure expressed in standard deviations would be nice.
 
       //Driver notes--driver specific
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
       //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      parseIndexedDatasetFormatOptions()
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
@@ -106,9 +105,9 @@ object RowSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override protected def start() : Unit = {
+  override protected def start(): Unit = {
 
-    super.start
+    super.start()
 
     readWriteSchema = new Schema(
       "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
@@ -120,8 +119,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
 
   private def readIndexedDataset: IndexedDataset = {
 
-    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
-      parser.opts("recursive").asInstanceOf[Boolean]).uris
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String],
+      parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       null.asInstanceOf[IndexedDataset]
@@ -132,8 +131,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
     }
   }
 
-  override def process: Unit = {
-    start
+  override def process(): Unit = {
+    start()
 
     val indexedDataset = readIndexedDataset
 
@@ -144,7 +143,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
 
     rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
index 368ee89..8531a0a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala
@@ -58,56 +58,56 @@ object TestNBDriver extends MahoutSparkDriver {
 
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
-      //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      //IndexedDataset output schema--not driver specific, IndexedDataset specific
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
-/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
-private def readTestSet: DrmLike[_] = {
-  val inputPath = parser.opts("input").asInstanceOf[String]
-  val trainingSet= drm.drmDfsRead(inputPath)
-  trainingSet
-}
+  /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */
+  private def readTestSet: DrmLike[_] = {
+    val inputPath = parser.opts("input").asInstanceOf[String]
+    val trainingSet = drm.drmDfsRead(inputPath)
+    trainingSet
+  }
 
-/** read the model from pathToModel using NBModel.DfsRead(...) */
-private def readModel: NBModel = {
-  val inputPath = parser.opts("pathToModel").asInstanceOf[String]
-  val model= NBModel.dfsRead(inputPath)
-  model
-}
+  /** read the model from pathToModel using NBModel.DfsRead(...) */
+  private def readModel: NBModel = {
+    val inputPath = parser.opts("pathToModel").asInstanceOf[String]
+    val model = NBModel.dfsRead(inputPath)
+    model
+  }
 
-override def process: Unit = {
-  start()
+  override def process(): Unit = {
+    start()
 
-  val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
-  val outputPath = parser.opts("output").asInstanceOf[String]
+    val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean]
+    val outputPath = parser.opts("output").asInstanceOf[String]
 
-  // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
+    // todo:  get the -ow option in to check for a model in the path and overwrite if flagged.
 
-  val testSet = readTestSet
-  val model = readModel
-  val analyzer= NaiveBayes.test(model, testSet, testComplementary)
+    val testSet = readTestSet
+    val model = readModel
+    val analyzer = NaiveBayes.test(model, testSet, testComplementary)
 
-  println(analyzer)
+    println(analyzer)
 
-  stop
-}
+    stop()
+  }
 
 }