You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/10/14 23:46:00 UTC

[2/2] git commit: NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59

NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/666d314f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/666d314f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/666d314f

Branch: refs/heads/master
Commit: 666d314fb8d9f466a5496a143d73d55036aea619
Parents: 4e6577d
Author: pferrel <pa...@occamsmachete.com>
Authored: Tue Oct 14 14:45:32 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Tue Oct 14 14:45:32 2014 -0700

----------------------------------------------------------------------
 .../apache/mahout/h2obindings/H2OEngine.scala   |  44 +++
 .../h2obindings/drm/CheckpointedDrmH2O.scala    |  13 +
 math-scala/pom.xml                              |   6 +
 .../apache/mahout/drivers/MahoutDriver.scala    |  43 +++
 .../mahout/drivers/MahoutOptionParser.scala     | 220 +++++++++++++++
 .../mahout/math/cf/SimilarityAnalysis.scala     |  56 +++-
 .../mahout/math/drm/CheckpointedDrm.scala       |   3 +
 .../mahout/math/drm/DistributedEngine.scala     |  25 ++
 .../org/apache/mahout/math/drm/package.scala    |  26 +-
 .../math/indexeddataset/IndexedDataset.scala    |  64 +++++
 .../math/indexeddataset/ReaderWriter.scala      |  68 +++++
 .../mahout/math/indexeddataset/Schema.scala     | 102 +++++++
 .../apache/mahout/common/HDFSPathSearch.scala   |  78 ++++++
 .../apache/mahout/drivers/FileSysUtils.scala    |  76 ------
 .../apache/mahout/drivers/IndexedDataset.scala  |  80 ------
 .../mahout/drivers/ItemSimilarityDriver.scala   |  90 +++----
 .../apache/mahout/drivers/MahoutDriver.scala    | 104 -------
 .../mahout/drivers/MahoutOptionParser.scala     | 214 ---------------
 .../mahout/drivers/MahoutSparkDriver.scala      |  87 ++++++
 .../apache/mahout/drivers/ReaderWriter.scala    |  66 -----
 .../mahout/drivers/RowSimilarityDriver.scala    |  41 +--
 .../org/apache/mahout/drivers/Schema.scala      |  98 -------
 .../drivers/TextDelimitedReaderWriter.scala     |  68 ++---
 .../mahout/sparkbindings/SparkEngine.scala      |  38 ++-
 .../drm/CheckpointedDrmSpark.scala              |  12 +
 .../indexeddataset/IndexedDatasetSpark.scala    |  53 ++++
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   | 267 ------------------
 .../mahout/cf/SimilarityAnalysisSuite.scala     | 269 +++++++++++++++++++
 29 files changed, 1273 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 28214c6..99bc3ba 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.h2obindings
 
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema}
+
 import scala.reflect._
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
@@ -112,4 +115,45 @@ object H2OEngine extends DistributedEngine {
   }
 
   implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
+
+  /** stub class not implemented in H2O */
+  abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+    extends IndexedDataset {}
+
+    /**
+   * reads an IndexedDatasetH2O from default text delimited files
+   * @todo unimplemented
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetH2O = {
+    // should log a warning when this is built but no logger here, can an H2O contributor help with this
+    println("Warning: unimplemented indexedDatasetDFSReadElements." )
+    throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.")
+    null.asInstanceOf[IndexedDatasetH2O]
+  }
+
+  /**
+   * reads an IndexedDatasetH2O from default text delimited files
+   * @todo unimplemented
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetH2O = {
+    // should log a warning when this is built but no logger here, can an H2O contributor help with this
+    println("Warning: unimplemented indexedDatasetDFSReadElements." )
+    throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.")
+    null.asInstanceOf[IndexedDatasetH2O]
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index c06455a..6d7628e 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -53,4 +53,17 @@ class CheckpointedDrmH2O[K: ClassTag](
   def canHaveMissingRows: Boolean = false
 
   protected[mahout] def partitioningTag: Long = h2odrm.frame.anyVec.group.hashCode
+
+  /** stub need to make IndexedDataset core but since drmWrap is not in H2O left for someone else */
+  override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+    throw new UnsupportedOperationException("CheckpointedDrmH2O#newRowCardinality is not implemented.")
+    /* this is the Spark impl
+    assert(n > -1)
+    assert( n >= nrow)
+    val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+    newCheckpointedDrm
+    */
+    this
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index b2fea6b..66309d6 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -172,6 +172,12 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+      
+    <dependency>
+      <groupId>com.github.scopt</groupId>
+      <artifactId>scopt_2.10</artifactId>
+      <version>3.2.0</version>
+    </dependency>
 
     <!-- scala stuff -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..8c1f8cf
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Extended by a platform specific version of this class to create a Mahout CLI driver.
+ */
+abstract class MahoutDriver {
+
+
+  implicit protected var mc: DistributedContext = _
+  protected var parser: MahoutOptionParser = _
+
+  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+
+  /** Override (optionally) for special cleanup */
+  protected def stop: Unit = {
+    if (!_useExistingContext) mc.close
+  }
+
+  /** This is where you do the work, call start first, then before exiting call stop */
+  protected def process: Unit
+
+  /** Parse command line and call process */
+  def main(args: Array[String]): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..c6968b8
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+import scala.collection.immutable
+
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+  * keep both standarized.
+  * @param programName Name displayed in help message, the name by which the driver is invoked.
+  * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+  * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so the can override and previous options!
+  var opts = Map.empty[String, Any]
+
+  override def showUsageOnError = true
+
+  def parseIOOptions(numInputs: Int = 1) = {
+    opts = opts ++ MahoutOptionParser.FileIOOptions
+    note("Input, output options")
+    opt[String]('i', "input") required() action { (x, options) =>
+      options + ("input" -> x)
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+    if (numInputs == 2) {
+      opt[String]("input2") abbr ("i2") action { (x, options) =>
+        options + ("input2" -> x)
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+    }
+
+    opt[String]('o', "output") required() action { (x, options) =>
+      // todo: check to see if HDFS allows MS-Windows backslashes locally?
+      if (x.endsWith("/")) {
+        options + ("output" -> x)
+      } else {
+        options + ("output" -> (x + "/"))
+      }
+    } text ("Path for output, any local or HDFS supported URI (required)")
+
+  }
+
+  def parseSparkOptions = {
+    opts = opts ++ MahoutOptionParser.SparkOptions
+    note("\nSpark config options:")
+
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+      options + ("master" -> x)
+    }
+
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+      options + ("sparkExecutorMem" -> x)
+    }
+
+  }
+
+  def parseGenericOptions = {
+    opts = opts ++ MahoutOptionParser.GenericOptions
+    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+      options + ("randomSeed" -> x)
+    } validate { x =>
+      if (x > 0) success else failure("Option --randomSeed must be > 0")
+    }
+
+    //output both input DRMs
+    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+      options + ("writeAllDatasets" -> true)
+    }//Hidden option, though a user might want this.
+  }
+
+  def parseElementInputSchemaOptions{
+    //Input text file schema--not driver specific but input data specific, elements input,
+    // not drms
+    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
+    note("\nInput text file schema options:")
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+      options + ("inDelim" -> x)
+    }
+
+    opt[String]("filter1") abbr ("f1") action { (x, options) =>
+      options + ("filter1" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+    opt[String]("filter2") abbr ("f2") action { (x, options) =>
+      options + ("filter2" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+    opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
+      options + ("rowIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    }
+
+    opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
+      options + ("itemIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    }
+
+    opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
+      options + ("filterColumn" -> x)
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+    }
+
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+    //check for column consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
+        || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
+        || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
+        failure("The row, item, and filter positions must be unique.") else success
+    }
+
+    //check for filter consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+        failure ("If using filters they must be unique.") else success
+    }
+
+  }
+
+  def parseFileDiscoveryOptions = {
+    //File finding strategy--not driver specific
+    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
+    note("\nFile discovery options:")
+    opt[Unit]('r', "recursive") action { (_, options) =>
+      options + ("recursive" -> true)
+    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+      options + ("filenamePattern" -> x)
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+  }
+
+  def parseDrmFormatOptions = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+    note("\nOutput text file schema options:")
+    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+      options + ("rowKeyDelim" -> x)
+    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+      options + ("columnIdStrengthDelim" -> x)
+    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+      options + ("elementDelim" -> x)
+    } text ("Separates vector element values in the values list (optional). Default: \" \"")
+
+    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+      options + ("omitStrength" -> true)
+    } text ("Do not write the strength to the output files (optional), Default: false.")
+    note("This option is used to output indexable data for creating a search engine recommender.")
+
+    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+  }
+
+}
+
+/** Companion object defines default option groups for reference in any driver that needs them.
+  * @note not all options are platform neutral so other platforms can add default options here if desired */
+object MahoutOptionParser {
+
+  // set up the various default option groups
+  final val GenericOptions = immutable.HashMap[String, Any](
+    "randomSeed" -> System.currentTimeMillis().toInt,
+    "writeAllDatasets" -> false)
+
+  final val SparkOptions = immutable.HashMap[String, Any](
+    "master" -> "local",
+    "sparkExecutorMem" -> "",
+    "appName" -> "Generic Spark App, Change this.")
+
+  final val FileIOOptions = immutable.HashMap[String, Any](
+    "input" -> null.asInstanceOf[String],
+    "input2" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String])
+
+  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
+    "filenamePattern" -> "^part-.*")
+
+  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
+    "rowIDColumn" -> 0,
+    "itemIDColumn" -> 1,
+    "filterColumn" -> -1,
+    "filter1" -> null.asInstanceOf[String],
+    "filter2" -> null.asInstanceOf[String],
+    "inDelim" -> "[,\t ]")
+
+  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "elementDelim" -> " ",
+    "omitStrength" -> false)
+}
+
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index 90d7559..5718304 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -18,6 +18,7 @@
 package org.apache.mahout.math.cf
 
 import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.IndexedDataset
 import scalabindings._
 import RLikeOps._
 import drm._
@@ -49,7 +50,7 @@ object SimilarityAnalysis extends Serializable {
     * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
     * @param maxNumInteractions max number of interactions after downsampling, default: 500
     * @return
-    * */
+    */
   def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
                     maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
 
@@ -99,13 +100,39 @@ object SimilarityAnalysis extends Serializable {
     indicatorMatrices
   }
 
+  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+    * and returns a list of indicator and cross-indicator matrices
+    * Somewhat easier to use method, which handles the ID dictionaries correctly
+    * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+    * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+    * @param maxInterestingItemsPerThing max similarities per items
+    * @param maxNumInteractions max number of input items per item
+    * @return
+    */
+  def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
+      randomSeed: Int = 0xdeadbeef,
+      maxInterestingItemsPerThing: Int = 50,
+      maxNumInteractions: Int = 500):
+    List[IndexedDataset] = {
+    val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
+    val primaryDrm = drms(0)
+    val secondaryDrms = drms.drop(1)
+    val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
+      maxNumInteractions, secondaryDrms)
+    val retIDSs = coocMatrices.iterator.zipWithIndex.map {
+      case( drm, i ) =>
+        indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
+    }
+    retIDSs.toList
+  }
+
   /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
     * @param drmARaw Primary interaction matrix
     * @param randomSeed when kept to a constant will make repeatable downsampling
     * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
     * @param maxNumInteractions max number of interactions after downsampling, default: 500
     * @return
-    * */
+    */
   def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
                     maxNumInteractions: Int = 500): DrmLike[Int] = {
 
@@ -130,10 +157,27 @@ object SimilarityAnalysis extends Serializable {
     drmSimilaritiesAAt
   }
 
-  /**
-   * Compute loglikelihood ratio
-   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-   **/
+  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+    * Uses IndexedDatasets, which handle external ID dictionaries properly
+    * @param indexedDataset compare each row to every other
+    * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
+    * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+    * @param maxObservationsPerRow max number of input elements to use
+    * @return
+    */
+  def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
+      maxInterestingSimilaritiesPerRow: Int = 50,
+      maxObservationsPerRow: Int = 500):
+    IndexedDataset = {
+    val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
+      maxObservationsPerRow)
+    indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
+  }
+
+   /**
+     * Compute loglikelihood ratio
+     * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+     */
   def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
     numInteractionsWithAandB: Long, numInteractions: Long) = {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 082e5b9..7f97481 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -41,4 +41,7 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
   def keyClassTag: ClassTag[K]
 
 
+  /** changes the number of rows without touching the underlying data */
+  def newRowCardinality(n: Int): CheckpointedDrm[K]
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index eaf5aeb..dd5b101 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.math.drm
 
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+
 import scala.reflect.ClassTag
 import logical._
 import org.apache.mahout.math._
@@ -85,6 +88,28 @@ trait DistributedEngine {
   /** Creates empty DRM with non-trivial height */
   def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
       (implicit sc: DistributedContext): CheckpointedDrm[Long]
+  /**
+   * Load IndexedDataset from text delimited format.
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
+  /**
+   * Load IndexedDataset from text delimited format, one element per line
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
 }
 
 object DistributedEngine {

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index b787ec0..3afbecb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -17,10 +17,13 @@
 
 package org.apache.mahout.math
 
-import scala.reflect.ClassTag
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema}
+import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+
+import scala.reflect.ClassTag
 
 package object drm {
 
@@ -114,3 +117,20 @@ package object drm {
   }
 
 }
+
+package object indexeddataset {
+  /** Load IndexedDataset from text delimited files */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit ctx: DistributedContext):
+    IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
+
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit ctx: DistributedContext):
+    IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
new file mode 100644
index 0000000..c7eb2cb
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+
+/**
+  * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
+  * ID/label translation dictionaries.
+  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+  * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
+  * used internal to Mahout core code.
+  *
+  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+  *       to be not created when not needed.
+  */
+
+trait IndexedDataset {
+  val matrix: CheckpointedDrm[Int]
+  val rowIDs: BiMap[String,Int]
+  val columnIDs: BiMap[String,Int]
+
+  /**
+   * Write a text delimited file(s) with the row and column IDs from dictionaries.
+   * @param dest
+   * @param schema
+   */
+  def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
+
+  /** Factory method, creates the extending class */
+  def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+    IndexedDataset
+
+  /**
+   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+   * No changes are made to the underlying drm.
+   * @param n number to use for new row cardinality, should be larger than current
+   * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
+   *       results.
+   */
+  def newRowCardinality(n: Int): IndexedDataset = {
+    // n is validated in matrix
+    this.create(matrix.newRowCardinality(n), rowIDs, columnIDs)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
new file mode 100644
index 0000000..cf429f5
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
+  * which also defines the type to be read.
+  * @tparam T type of object to read.
+  */
+trait Reader[T]{
+
+  val mc: DistributedContext
+  val readSchema: Schema
+
+  protected def elementReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  protected def drmReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  def readElementsFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    elementReader(mc, readSchema, source, existingRowIDs)
+
+  def readDRMFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    drmReader(mc, readSchema, source, existingRowIDs)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+  * which also defines the type to be written.
+  * @tparam T type of object to write.
+  */
+trait Writer[T]{
+
+  val mc: DistributedContext
+  val sort: Boolean
+  val writeSchema: Schema
+
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
new file mode 100644
index 0000000..557b419
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for mutable.HashMap[String, Any]
+  *
+  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+  */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+  // note: this require a mutable HashMap, do we care?
+  this ++= params
+
+  /** Constructor for copying an existing Schema
+    *
+    * @param schemaToClone return a copy of this Schema
+    */
+  def this(schemaToClone: Schema){
+    this()
+    this ++= schemaToClone
+  }
+}
+
+/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+  * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+  * , which can be used to create a Mahout DRM for DSL ops.
+  */
+
+
+/** Simple default Schema for typical text delimited element file input
+  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+  * <comma, tab, or space>here may be other ignored text...)
+  */
+final object DefaultIndexedDatasetElementReadSchema extends Schema(
+  "delim" -> "[,\t ]", //comma, tab or space
+  "filter" -> "",
+  "rowIDColumn" -> 0,
+  "columnIDPosition" -> 1,
+  "filterColumn" -> -1)
+
+/** Default Schema for text delimited drm file output
+  * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+  */
+final object DefaultIndexedDatasetWriteSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> false)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
+  * This tells the reader to input text lines of the form:
+  * (rowID<tab>columnID1:score1,columnID2:score2,...)
+  */
+final object DefaultIndexedDatasetReadSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ")
+
+/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
+  * the score of any element is ignored,
+  * all non-zeros are replaced with 1.
+  * This tells the reader to input DRM lines of the form
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+  * Alternatively the format can be
+  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+  * output format for [[IndexedDatasetWriteBooleanSchema]]
+  */
+final object IndexedDatasetReadBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> true)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
+  * the score of a element is omitted.
+  * The presence of a element means the score = 1, the absence means a score of 0.
+  * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
+  * (rowID<tab>columnID1<space>columnID2...)
+  */
+final object IndexedDatasetWriteBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> true)
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
new file mode 100644
index 0000000..42bf697
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
+
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+  val conf = new Configuration()
+  val fs = FileSystem.get(conf)
+
+  /** Returns a string of comma delimited URIs matching the filePattern
+    * When pattern matching dirs are never returned, only traversed. */
+  def uris: String = {
+    if (!filePattern.isEmpty){ // have file pattern so
+    val pathURIs = pathURI.split(",")
+      var files = ""
+      for ( uri <- pathURIs ){
+        files = findFiles(uri, filePattern, files)
+      }
+      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+      files
+    }else{
+      pathURI
+    }
+  }
+
+  /** Find matching files in the dir, recursively call self when another directory is found
+    * Only files are matched, directories are traversed but never return a match */
+  private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+    val seed = fs.getFileStatus(new Path(dir))
+    var f: String = files
+
+    if (seed.isDir) {
+      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+      for (fileStatus <- fileStatuses) {
+        if (fileStatus.getPath().getName().matches(filePattern)
+          && !fileStatus.isDir) {
+          // found a file
+          if (fileStatus.getLen() != 0) {
+            // file is not empty
+            f = f + fileStatus.getPath.toUri.toString + ","
+          }
+        } else if (fileStatus.isDir && recursive) {
+          f = findFiles(fileStatus.getPath.toString, filePattern, f)
+        }
+      }
+    }else{ f = dir }// was a filename not dir
+    f
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
deleted file mode 100644
index f48e9ed..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
-
-/**
- * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
- * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
- * @param filePattern regex that must match the entire filename to have the file returned
- * @param recursive true traverses the filesystem recursively, default = false
- */
-
-case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
-
-  val conf = new Configuration()
-  val fs = FileSystem.get(conf)
-
-/** Returns a string of comma delimited URIs matching the filePattern
-  * When pattern matching dirs are never returned, only traversed. */
-  def uris :String = {
-    if (!filePattern.isEmpty){ // have file pattern so
-    val pathURIs = pathURI.split(",")
-      var files = ""
-      for ( uri <- pathURIs ){
-        files = findFiles(uri, filePattern, files)
-      }
-      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
-      files
-    }else{
-      pathURI
-    }
-  }
-
-/** Find matching files in the dir, recursively call self when another directory is found
-  * Only files are matched, directories are traversed but never return a match */
-  private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
-    val seed = fs.getFileStatus(new Path(dir))
-    var f :String = files
-
-    if (seed.isDir) {
-      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
-      for (fileStatus <- fileStatuses) {
-        if (fileStatus.getPath().getName().matches(filePattern)
-          && !fileStatus.isDir) {
-          // found a file
-          if (fileStatus.getLen() != 0) {
-            // file is not empty
-            f = f + fileStatus.getPath.toUri.toString + ","
-          }
-        } else if (fileStatus.isDir && recursive) {
-          f = findFiles(fileStatus.getPath.toString, filePattern, f)
-        }
-      }
-    }else{ f = dir }// was a filename not dir
-    f
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
deleted file mode 100644
index 99f98f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.apache.mahout.sparkbindings._
-
-/**
-  * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
-  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
-  * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
-  * used internal to Mahout Core code.
-  *
-  * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
-  * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
-  * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
-  *
-  * @param matrix  DrmLike[Int], representing the distributed matrix storing the actual data.
-  * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
-  *                  and from the ordinal Mahout Int ID. This one holds row labels
-  * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
-  *                  ID to and from the ordinal Mahout Int ID. This one holds column labels
-  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
-  *       to be not created when not needed.
-  */
-
-case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
-
-  // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
-  // learn this afterwards
-
-  /**
-   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
-   * No physical changes are made to the underlying drm.
-   * @param n number to use for row carnindality, should be larger than current
-   * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
-   *       results.
-   */
-  def newRowCardinality(n: Int): IndexedDataset = {
-    assert(n > -1)
-    assert( n >= matrix.nrow)
-    val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
-    val ncol = matrix.ncol
-    val newMatrix = drmWrap[Int](drmRdd, n, ncol)
-    new IndexedDataset(newMatrix, rowIDs, columnIDs)
-  }
-
-}
-
-/**
-  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
-  * constructor for
-  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
-  * [[org.apache.mahout.drivers.Reader]]
-  * {{{
-  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
-  * }}}
-  */
-
-object IndexedDataset {
-  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix,  id2.rowIDs, id2.columnIDs)
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 0b8ded6..42e9d81 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,15 +17,19 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSReadElements}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
  * Reads text lines
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
- * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
  * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
@@ -40,17 +44,16 @@ import scala.collection.immutable.HashMap
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option.
  */
-object ItemSimilarityDriver extends MahoutDriver {
+object ItemSimilarityDriver extends MahoutSparkDriver {
   // define only the options specific to ItemSimilarity
   private final val ItemSimilarityOptions = HashMap[String, Any](
     "maxPrefs" -> 500,
     "maxSimilaritiesPerItem" -> 100,
     "appName" -> "ItemSimilarityDriver")
 
-  private var reader1: TextDelimitedIndexedDatasetReader = _
-  private var reader2: TextDelimitedIndexedDatasetReader = _
-  private var writer: TextDelimitedIndexedDatasetWriter = _
   private var writeSchema: Schema = _
+  private var readSchema1: Schema = _
+  private var readSchema2: Schema = _
 
   /**
    * @param args  Command line args, if empty a help message is printed.
@@ -119,27 +122,27 @@ object ItemSimilarityDriver extends MahoutDriver {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
     // MahoutKryoRegistrator, it should be added to the register list here so it
-    // will be only spcific to this job.
+    // will be only specific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+      .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
+
+    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+    //else leave as set in Spark config
 
     super.start(masterUrl, appName)
 
-    val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+    readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
       "filter" -> parser.opts("filter1").asInstanceOf[String],
       "rowIDColumn" -> parser.opts("rowIDColumn").asInstanceOf[Int],
       "columnIDPosition" -> parser.opts("itemIDColumn").asInstanceOf[Int],
       "filterColumn" -> parser.opts("filterColumn").asInstanceOf[Int])
 
-    reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
-
     if ((parser.opts("filterColumn").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
       || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
       // only need to change the filter used compared to readSchema1
-      val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
+      readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
 
-      reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
     }
 
     writeSchema = new Schema(
@@ -147,36 +150,34 @@ object ItemSimilarityDriver extends MahoutDriver {
       "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-
-    writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
-
-  }
+    }
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
     val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
-    else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+    else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       Array()
     } else {
 
-      val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
-      if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
-        parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+      val datasetA = indexedDatasetDFSReadElements(inFiles,readSchema1)
+      if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+        datasetA.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions",
+          schema = writeSchema)
 
-      // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+      // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
       // Here we assume there is one row ID space for all interactions. To do this we calculate the
       // row cardinality only after reading in A and B (or potentially C...) We then adjust the
       // cardinality so all match, which is required for the math to work.
       // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
-      // be supported (and are at least on Spark) or the row cardinality fix will not work.
+      // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
       val datasetB = if (!inFiles2.isEmpty) {
         // get cross-cooccurrence interactions from separate files
-        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+        val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs)
 
         datasetB
 
@@ -184,12 +185,12 @@ object ItemSimilarityDriver extends MahoutDriver {
         && parser.opts("filter2").asInstanceOf[String] != null) {
 
         // get cross-cooccurrences interactions by using two filters on a single set of files
-        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+        val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs)
 
         datasetB
 
       } else {
-        null.asInstanceOf[IndexedDataset]
+        null.asInstanceOf[IndexedDatasetSpark]
       }
       if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
       // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
@@ -203,8 +204,9 @@ object ItemSimilarityDriver extends MahoutDriver {
         val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
         else datasetB // this guarantees matching cardinality
 
-        if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
-
+        if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+          datasetB.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/secondary-interactions",
+            schema = writeSchema)
         Array(returnedA, returnedB)
       } else Array(datasetA)
     }
@@ -214,31 +216,13 @@ object ItemSimilarityDriver extends MahoutDriver {
     start()
 
     val indexedDatasets = readIndexedDatasets
+    val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
+      parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
 
     // todo: allow more than one cross-similarity matrix?
-    val indicatorMatrices = {
-      if (indexedDatasets.length > 1) {
-        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
-          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
-          Array(indexedDatasets(1).matrix))
-      } else {
-        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
-          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
-      }
-    }
-
-    // an alternative is to create a version of IndexedDataset that knows how to write itself
-    val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
-      indexedDatasets(0).columnIDs, writeSchema)
-    selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
-
-    // todo: would be nice to support more than one cross-similarity indicator
-    if (indexedDatasets.length > 1) {
-
-      val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
-      writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
-
-    }
+    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+    if(idss.length > 1)
+      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
 
     stop
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
deleted file mode 100644
index 6ea7c8b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-
-import scala.collection.immutable
-
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a Map of options for the command line parser. The following template may help:
-  * {{{
-  * object SomeDriver extends MahoutDriver {
-  *
-  *   // define only the options specific to this driver, inherit the generic ones
-  *   private final val SomeOptions = HashMap[String, Any](
-  *       "maxThings" -> 500,
-  *       "minThings" -> 100,
-  *       "appName" -> "SomeDriver")
-  *
-  *   override def main(args: Array[String]): Unit = {
-  *
-  *
-  *     val parser = new MahoutOptionParser(programName = "shortname") {
-  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
-  *
-  *       // Input output options, non-driver specific
-  *       parseIOOptions
-  *
-  *       // Algorithm specific options
-  *       // Add in the new options
-  *       opts = opts ++ SomeOptions
-  *       note("\nAlgorithm control options:")
-  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
-  *         options + ("maxThings" -> x) ...
-  *     }
-  *     parser.parse(args, parser.opts) map { opts =>
-  *       parser.opts = opts
-  *       process
-  *     }
-  *   }
-  *
-  *   override def process: Unit = {
-  *     start()
-  *     // do the work here
-  *     stop
-  *   }
-  *
-  * }}}
-  */
-abstract class MahoutDriver {
-
-
-  implicit protected var mc: DistributedContext = _
-  implicit protected var sparkConf = new SparkConf()
-  protected var parser: MahoutOptionParser = _
-
-  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
-
-  /** Creates a Spark context to run the job inside.
-    * Override to set the SparkConf values specific to the job,
-    * these must be set before the context is created.
-    * @param masterUrl Spark master URL
-    * @param appName  Name to display in Spark UI
-    * */
-  protected def start(masterUrl: String, appName: String) : Unit = {
-    if (!_useExistingContext) {
-      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
-    }
-  }
-
-  /** Override (optionally) for special cleanup */
-  protected def stop: Unit = {
-    if (!_useExistingContext) mc.close
-  }
-
-  /** This is where you do the work, call start first, then before exiting call stop */
-  protected def process: Unit
-
-  /** Parse command line and call process */
-  def main(args: Array[String]): Unit
-
-  def useContext(context: DistributedContext): Unit = {
-    _useExistingContext = true
-    mc = context
-    sparkConf = mc.getConf
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
deleted file mode 100644
index ad7a76b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.drivers
-
-import scopt.OptionParser
-import scala.collection.immutable
-
-/** Companion object defines default option groups for reference in any driver that needs them */
-object MahoutOptionParser {
-  // set up the various default option groups
-  final val GenericOptions = immutable.HashMap[String, Any](
-    "randomSeed" -> System.currentTimeMillis().toInt,
-    "writeAllDatasets" -> false)
-
-  final val SparkOptions = immutable.HashMap[String, Any](
-    "master" -> "local",
-    "sparkExecutorMem" -> "2g",
-    "appName" -> "Generic Spark App, Change this.")
-
-  final val FileIOOptions = immutable.HashMap[String, Any](
-    "input" -> null.asInstanceOf[String],
-    "input2" -> null.asInstanceOf[String],
-    "output" -> null.asInstanceOf[String])
-
-  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
-    "recursive" -> false,
-    "filenamePattern" -> "^part-.*")
-
-  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
-    "rowIDColumn" -> 0,
-    "itemIDColumn" -> 1,
-    "filterColumn" -> -1,
-    "filter1" -> null.asInstanceOf[String],
-    "filter2" -> null.asInstanceOf[String],
-    "inDelim" -> "[,\t ]")
-
-  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
-    "rowKeyDelim" -> "\t",
-    "columnIdStrengthDelim" -> ":",
-    "elementDelim" -> " ",
-    "omitStrength" -> false)
-}
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
-  * keep both standarized.
-  * @param programName Name displayed in help message, the name by which the driver is invoked.
-  * */
-class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
-
-  // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so the can override and previous options!
-  var opts = Map.empty[String, Any]
-
-  override def showUsageOnError = true
-
-  def parseIOOptions(numInputs: Int = 1) = {
-    opts = opts ++ MahoutOptionParser.FileIOOptions
-    note("Input, output options")
-    opt[String]('i', "input") required() action { (x, options) =>
-      options + ("input" -> x)
-    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
-    if (numInputs == 2) {
-      opt[String]("input2") abbr ("i2") action { (x, options) =>
-        options + ("input2" -> x)
-      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
-    }
-
-    opt[String]('o', "output") required() action { (x, options) =>
-      // todo: check to see if HDFS allows MS-Windows backslashes locally?
-      if (x.endsWith("/")) {
-        options + ("output" -> x)
-      } else {
-        options + ("output" -> (x + "/"))
-      }
-    } text ("Path for output, any local or HDFS supported URI (required)")
-
-  }
-
-  def parseSparkOptions = {
-    opts = opts ++ MahoutOptionParser.SparkOptions
-    note("\nSpark config options:")
-
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-      options + ("master" -> x)
-    }
-
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
-      options + ("sparkExecutorMem" -> x)
-    }
-
-  }
-
-  def parseGenericOptions = {
-    opts = opts ++ MahoutOptionParser.GenericOptions
-    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
-      options + ("randomSeed" -> x)
-    } validate { x =>
-      if (x > 0) success else failure("Option --randomSeed must be > 0")
-    }
-
-    //output both input DRMs
-    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
-      options + ("writeAllDatasets" -> true)
-    }//Hidden option, though a user might want this.
-  }
-
-  def parseElementInputSchemaOptions{
-    //Input text file schema--not driver specific but input data specific, elements input,
-    // not drms
-    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
-    note("\nInput text file schema options:")
-    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
-      options + ("inDelim" -> x)
-    }
-
-    opt[String]("filter1") abbr ("f1") action { (x, options) =>
-      options + ("filter1" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
-    opt[String]("filter2") abbr ("f2") action { (x, options) =>
-      options + ("filter2" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
-
-    opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
-      options + ("rowIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
-    }
-
-    opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
-      options + ("itemIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
-    }
-
-    opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
-      options + ("filterColumn" -> x)
-    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
-      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
-    }
-
-    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
-
-    checkConfig { options: Map[String, Any] =>
-      if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
-        || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
-        || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
-        failure("The row, item, and filter positions must be unique.") else success
-    }
-
-    //check for option consistency, probably driver specific
-    checkConfig { options: Map[String, Any] =>
-      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
-        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
-        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
-        failure ("If using filters they must be unique.") else success
-    }
-
-  }
-
-  def parseFileDiscoveryOptions = {
-    //File finding strategy--not driver specific
-    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
-    note("\nFile discovery options:")
-    opt[Unit]('r', "recursive") action { (_, options) =>
-      options + ("recursive" -> true)
-    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
-
-    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
-      options + ("filenamePattern" -> x)
-    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
-
-  }
-
-  def parseDrmFormatOptions = {
-    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
-    note("\nOutput text file schema options:")
-    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
-      options + ("rowKeyDelim" -> x)
-    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
-    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
-      options + ("columnIdStrengthDelim" -> x)
-    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
-    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
-      options + ("elementDelim" -> x)
-    } text ("Separates vector element values in the values list (optional). Default: \" \"")
-
-    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
-      options + ("omitStrength" -> true)
-    } text ("Do not write the strength to the output files (optional), Default: false.")
-    note("This option is used to output indexable data for creating a search engine recommender.")
-
-    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
new file mode 100644
index 0000000..5a4254b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+  * Also define a Map of options for the command line parser. The following template may help:
+  * {{{
+  * object SomeDriver extends MahoutDriver {
+  *
+  *   // define only the options specific to this driver, inherit the generic ones
+  *   private final val SomeOptions = HashMap[String, Any](
+  *       "maxThings" -> 500,
+  *       "minThings" -> 100,
+  *       "appName" -> "SomeDriver")
+  *
+  *   override def main(args: Array[String]): Unit = {
+  *
+  *
+  *     val parser = new MahoutOptionParser(programName = "shortname") {
+  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
+  *
+  *       // Input output options, non-driver specific
+  *       parseIOOptions
+  *
+  *       // Algorithm specific options
+  *       // Add in the new options
+  *       opts = opts ++ SomeOptions
+  *       note("\nAlgorithm control options:")
+  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+  *         options + ("maxThings" -> x) ...
+  *     }
+  *     parser.parse(args, parser.opts) map { opts =>
+  *       parser.opts = opts
+  *       process
+  *     }
+  *   }
+  *
+  *   override def process: Unit = {
+  *     start()
+  *     // do the work here
+  *     stop
+  *   }
+  *
+  * }}}
+  */
+abstract class MahoutSparkDriver extends MahoutDriver {
+
+
+  implicit protected var sparkConf = new SparkConf()
+
+  /** Creates a Spark context to run the job inside.
+    * Override to set the SparkConf values specific to the job,
+    * these must be set before the context is created.
+    * @param masterUrl Spark master URL
+    * @param appName  Name to display in Spark UI
+    * */
+  protected def start(masterUrl: String, appName: String) : Unit = {
+    if (!_useExistingContext) {
+      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+    }
+  }
+
+  def useContext(context: DistributedContext): Unit = {
+    _useExistingContext = true
+    mc = context
+    sparkConf = mc.getConf
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
deleted file mode 100644
index 6351e45..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.{HashBiMap, BiMap}
-import org.apache.mahout.math.drm.DistributedContext
-
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
-  * @tparam T type of object read.
-  */
-trait Reader[T]{
-
-  val mc: DistributedContext
-  val readSchema: Schema
-
-  protected def elementReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int]): T
-
-  protected def drmReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int]): T
-
-  def readElementsFrom(
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    elementReader(mc, readSchema, source, existingRowIDs)
-
-  def readDRMFrom(
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    drmReader(mc, readSchema, source, existingRowIDs)
-}
-
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
-  * @tparam T type of object to write.
-  */
-trait Writer[T]{
-
-  val mc: DistributedContext
-  val sort: Boolean
-  val writeSchema: Schema
-
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
-
-  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
-}