You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/10/14 23:45:59 UTC

[1/2] NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59

Repository: mahout
Updated Branches:
  refs/heads/master 4e6577d14 -> 666d314fb


http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 920c32b..f8abef3 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -17,15 +17,18 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSRead}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]].
- * Reads a text delimited file containing a Mahout DRM of the form
- * (row id, column id: strength, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]].
+ * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * with domain specific IDS of the form
+ * (row id, column id: strength, ...). The IDs will be preserved in the
+ * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]
  * will be used to calculate row-wise similarity using log-likelihood
  * The options allow control of the input schema, file discovery, output schema, and control of
  * algorithm parameters.
@@ -36,14 +39,13 @@ import scala.collection.immutable.HashMap
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option.
  */
-object RowSimilarityDriver extends MahoutDriver {
+object RowSimilarityDriver extends MahoutSparkDriver {
   // define only the options specific to RowSimilarity
   private final val RowSimilarityOptions = HashMap[String, Any](
     "maxObservations" -> 500,
     "maxSimilaritiesPerRow" -> 100,
     "appName" -> "RowSimilarityDriver")
 
-  private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _
   private var readWriteSchema: Schema = _
 
   /**
@@ -110,10 +112,13 @@ object RowSimilarityDriver extends MahoutDriver {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
     // MahoutKryoRegistrator, it should be added to the register list here so it
-    // will be only spcific to this job.
+    // will be only specific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+      .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take this out?
+
+    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+    //else leave as set in Spark config
 
     super.start(masterUrl, appName)
 
@@ -123,20 +128,18 @@ object RowSimilarityDriver extends MahoutDriver {
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
 
-    readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema)
-
   }
 
   private def readIndexedDataset: IndexedDataset = {
 
-    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       null.asInstanceOf[IndexedDataset]
     } else {
 
-      val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles))
+      val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema)
       datasetA
     }
   }
@@ -146,12 +149,12 @@ object RowSimilarityDriver extends MahoutDriver {
 
     val indexedDataset = readIndexedDataset
 
-    val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int],
-      parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int])
+    val rowSimilarityIDS = SimilarityAnalysis.rowSimilarityIDS(indexedDataset,
+      parser.opts("randomSeed").asInstanceOf[Int],
+      parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int],
+      parser.opts("maxObservations").asInstanceOf[Int])
 
-    val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm,
-      indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema)
-    rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String])
+    rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
 
     stop
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
deleted file mode 100644
index 92163be..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import scala.collection.mutable
-import scala.collection.mutable.HashMap
-
-/** Syntactic sugar for mutable.HashMap[String, Any]
-  *
-  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
-  */
-class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
-  // note: this require a mutable HashMap, do we care?
-  this ++= params
-
-  /** Constructor for copying an existing Schema
-    *
-    * @param schemaToClone return a copy of this Schema
-    */
-  def this(schemaToClone: Schema){
-    this()
-    this ++= schemaToClone
-  }
-}
-
-// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
-// format is not required.
-
-/** Simple default Schema for typical text delimited element file input
-  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
-  * <comma, tab, or space>here may be other ignored text...)
-  */
-class DefaultElementReadSchema extends Schema(
-  "delim" -> "[,\t ]", //comma, tab or space
-  "filter" -> "",
-  "rowIDColumn" -> 0,
-  "columnIDPosition" -> 1,
-  "filterColumn" -> -1)
-
-/** Default Schema for text delimited drm file output
-  * This tells the writer to write a DRM of the default form:
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
-  */
-class DefaultDRMWriteSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> false)
-
-/** Default Schema for typical text delimited drm file input
-  * This tells the reader to input text lines of the form:
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
-  */
-class DefaultDRMReadSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ")
-
-/** Default Schema for reading a text delimited drm file  where the score of any element is ignored,
-  * all non-zeros are replaced with 1.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
-  * Alternatively the format can be
-  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
-  * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
-  */
-class DRMReadBooleanSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> true)
-
-/** Default Schema for typical text delimited drm file write where the score of a element is omitted.
-  * The presence of a element means the score = 1, the absence means a score of 0.
-  * This tells the writer to output DRM lines of the form
-  * (rowID<tab>columnID1<space>columnID2...)
-  */
-class DRMWriteBooleanSchema extends Schema(
-  "rowKeyDelim" -> "\t",
-  "columnIdStrengthDelim" -> ":",
-  "elementDelim" -> " ",
-  "omitScore" -> true)
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 274ad98..ba8f7d1 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, IndexedDataset}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
 import com.google.common.collect.{BiMap, HashBiMap}
@@ -24,23 +26,23 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
 
-/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
+/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]]
   */
-trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
   /** Read in text delimited elements from all URIs in the comma delimited source String and return
     * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
     * no strength value in the element, assume it's presence means a strength of 1.
     *
     * @param mc context for the Spark job
     * @param readSchema describes the delimiters and positions of values in the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+    * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
     * @return
     */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = {
     try {
       val delimiter = readSchema("delim").asInstanceOf[String]
       val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int]
@@ -105,7 +107,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
       val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
 
-      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+      new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
@@ -120,14 +122,14 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     *
     * @param mc context for the Spark job
     * @param readSchema describes the delimiters and positions of values in the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+    * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
     * @return
     */
   protected def drmReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = {
     try {
       val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -193,7 +195,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
       val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
 
-      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+      new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
 
     } catch {
       case cce: ClassCastException => {
@@ -203,7 +205,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     }
   }
 
-      // this creates a BiMap from an ID collection. The ID points to an ordinal int
+  // this creates a BiMap from an ID collection. The ID points to an ordinal int
   // which is used internal to Mahout as the row or column ID
   // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
   // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
@@ -218,7 +220,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
   }
 }
 
-trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
 
   private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
 
@@ -226,13 +228,13 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
     *
     * @param mc context for the Spark job
     * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
-    * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
+    * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
     */
   protected def writer(
       mc: DistributedContext,
       writeSchema: Schema,
       dest: String,
-      indexedDataset: IndexedDataset,
+      indexedDataset: IndexedDatasetSpark,
       sort: Boolean = true): Unit = {
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
@@ -295,7 +297,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
 /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
   * @param mc Spark context for reading files
-  * @note The source is supplied by Reader#readElementsFrom .
+  * @note The source is supplied to Reader#readElementsFrom .
   * */
 class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
     (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
@@ -303,7 +305,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
 /** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading files
-  * @note the destination is supplied by Writer#writeTo trait method
+  * @note the destination is supplied to Writer#writeTo
   * */
 class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
 
@@ -316,41 +318,3 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
     (implicit val mc: DistributedContext)
   extends TDIndexedDatasetReaderWriter
 
-/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
-  * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
-  * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
-  * are probably short lived in terms of lines of code so complexity may be moot.
-  * @param matrix the data
-  * @param rowIDs bi-directional dictionary for rows of external IDs to internal ordinal Mahout IDs.
-  * @param columnIDs bi-directional dictionary for columns of external IDs to internal ordinal Mahout IDs.
-  * @param writeSchema contains params for the schema/format or the written text delimited file.
-  * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
-  * */
-class IndexedDatasetTextDelimitedWriteable(
-    matrix: CheckpointedDrm[Int],
-    rowIDs: BiMap[String,Int],
-    columnIDs: BiMap[String,Int],
-    val writeSchema: Schema,
-    val sort: Boolean = true)
-    (implicit val mc: DistributedContext)
-  extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
-
-  override def writeTo(collection: IndexedDataset = this, dest: String): Unit = {
-    super.writeTo(this, dest)
-  }
-}
-
-/**
-  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily
-  * used to get a secondary constructor for
-  * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a
-  * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
-  * {{{
-  *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source))
-  * }}}
-  */
-
-object IndexedDatasetTextDelimitedWriteable {
-  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 08b2c34..c0d36c6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,10 +17,11 @@
 
 package org.apache.mahout.sparkbindings
 
-import java.io.IOException
-
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
 import org.apache.mahout.math._
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
@@ -250,6 +251,37 @@ object SparkEngine extends DistributedEngine {
     }
   }
 
+  /**
+   * reads an IndexedDatasetSpark from default text delimited files
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
+    val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+    val ids = reader.readDRMFrom(src, existingRowIDs)
+    ids
+  }
+
+  /**
+   * reads an IndexedDatasetSpark from default text delimited files
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
+    val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+    val ids = reader.readElementsFrom(src, existingRowIDs)
+    ids
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index b753f6f..e5a2b2a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -197,4 +197,16 @@ class CheckpointedDrmSpark[K: ClassTag](
   protected def computeNNonZero =
     cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
 
+  /** Changes the number of rows in the DRM without actually touching the underlying data. Used to
+    * redimension a DRM after it has been created, which implies some blank, non-existent rows.
+    * @param n new row dimension
+    * @return
+    */
+  override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+    assert(n > -1)
+    assert( n >= nrow)
+    val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+    newCheckpointedDrm
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
new file mode 100644
index 0000000..d3aa0a8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, Reader, Schema, IndexedDataset}
+
+/**
+ * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
+ * dfsWrite method
+ */
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+  extends IndexedDataset {
+
+  /** Secondary constructor enabling immutability */
+  def this(id2: IndexedDatasetSpark){
+    this(id2.matrix, id2.rowIDs, id2.columnIDs)
+  }
+
+  /** Factory method used to create this extending class when the interface of
+    * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */
+  override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+    IndexedDatasetSpark = {
+    new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
+  }
+
+  /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+  override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
+      (implicit sc: DistributedContext):
+    Unit = {
+    val writer = new TextDelimitedIndexedDatasetWriter(schema)(sc)
+    writer.writeTo(this, dest)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 6ca2a14..c441716 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout
 
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
+import org.apache.mahout.math.indexeddataset.Schema
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.spark.{SparkConf, SparkContext}
 import java.io._
 import scala.collection.mutable.ArrayBuffer
@@ -224,5 +227,4 @@ package object sparkbindings {
     mcjars
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
deleted file mode 100644
index 29c7b84..0000000
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math.cf.SimilarityAnalysis
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings.{MatrixOps, _}
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.test.MahoutSuite
-import org.scalatest.FunSuite
-
-/* values 
-A =
-1	1	0	0	0
-0	0	1	1	0
-0	0	0	0	1
-1	0	0	1	0
-
-B =
-1	1	1	1	0
-1	1	1	1	0
-0	0	1	0	1
-1	1	0	1	0
- */
-
-class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
-
-  // correct cooccurrence with LLR
-  final val matrixLLRCoocAtAControl = dense(
-    (0.0,                1.7260924347106847, 0.0,                     0.0,                0.0),
-    (1.7260924347106847, 0.0,                0.0,                     0.0,                0.0),
-    (0.0,                0.0,                0.0,                     1.7260924347106847, 0.0),
-    (0.0,                0.0,                1.7260924347106847,      0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.0,                0.0))
-
-  // correct cross-cooccurrence with LLR
-  final val m = dense(
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
-    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
-    (0.0,                0.0,                0.0,                0.0,                4.498681156950466))
-
-  final val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
-      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
-
-
-  test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
-    val a = dense(
-        (1, 1, 0, 0, 0),
-        (0, 0, 1, 1, 0),
-        (0, 0, 0, 0, 1),
-        (1, 0, 0, 1, 0))
-
-    val b = dense(
-        (1, 1, 1, 1, 0),
-        (1, 1, 1, 1, 0),
-        (0, 0, 1, 0, 1),
-        (1, 1, 0, 1, 0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-
-  }
-
-  test("cooccurrence [A'A], [B'A] double data using LLR") {
-    val a = dense(
-        (100000.0D, 1.0D,  0.0D,  0.0D,     0.0D),
-        (     0.0D, 0.0D, 10.0D,  1.0D,     0.0D),
-        (     0.0D, 0.0D,  0.0D,  0.0D,  1000.0D),
-        (     1.0D, 0.0D,  0.0D, 10.0D,     0.0D))
-
-    val b = dense(
-        (10000.0D, 100.0D,     1000.0D,      1.0D,   0.0D),
-        (   10.0D,   1.0D, 10000000.0D,     10.0D,   0.0D),
-        (    0.0D,   0.0D,     1000.0D,      0.0D, 100.0D),
-        (  100.0D,   1.0D,        0.0D, 100000.0D,   0.0D))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-  }
-
-  test("cooccurrence [A'A], [B'A] integer data using LLR") {
-    val a = dense(
-        ( 1000,  10,       0,    0,   0),
-        (    0,   0,  -10000,   10,   0),
-        (    0,   0,       0,    0, 100),
-        (10000,   0,       0, 1000,   0))
-
-    val b = dense(
-        (  100, 1000, -10000, 10000,    0),
-        (10000, 1000,    100,    10,    0),
-        (    0,    0,     10,     0, -100),
-        (   10,  100,      0,  1000,    0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-   //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
-    //var cp = drmSelfCooc(0).checkpoint()
-    //cp.writeDRM("/tmp/cooc-spark/")//to get values written
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-    n should be < 1E-10
-  }
-
-  test("cooccurrence two matrices with different number of columns"){
-    val a = dense(
-      (1, 1, 0, 0, 0),
-      (0, 0, 1, 1, 0),
-      (0, 0, 0, 0, 1),
-      (1, 0, 0, 1, 0))
-
-    val b = dense(
-      (0, 1, 1, 0),
-      (1, 1, 1, 0),
-      (0, 0, 1, 0),
-      (1, 1, 0, 1))
-
-    val matrixLLRCoocBtANonSymmetric = dense(
-      (0.0,                1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
-      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
-      (0.0,                0.0,                0.6795961471815897, 0.0))
-
-    val drmA = drmParallelize(m = a, numPartitions = 2)
-    val drmB = drmParallelize(m = b, numPartitions = 2)
-
-    //self similarity
-    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
-    val matrixSelfCooc = drmCooc(0).checkpoint().collect
-    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
-    var n = (new MatrixOps(m = diffMatrix)).norm
-    n should be < 1E-10
-
-    //cross similarity
-    val matrixCrossCooc = drmCooc(1).checkpoint().collect
-    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
-    n = (new MatrixOps(m = diff2Matrix)).norm
-
-    //cooccurrence without LLR is just a A'B
-    //val inCoreAtB = a.transpose().times(b)
-    //val bp = 0
-  }
-
-  test("LLR calc") {
-    val A = dense(
-        (1, 1, 0, 0, 0),
-        (0, 0, 1, 1, 0),
-        (0, 0, 0, 0, 1),
-        (1, 0, 0, 1, 0))
-
-    val AtA = A.transpose().times(A)
-
-    /* AtA is:
-      0  =>	{0:2.0,1:1.0,3:1.0}
-      1  =>	{0:1.0,1:1.0}
-      2  =>	{2:1.0,3:1.0}
-      3  =>	{0:1.0,2:1.0,3:2.0}
-      4  =>	{4:1.0}
-
-          val AtAd = dense(
-         (2, 1, 0, 1, 0),
-         (1, 1, 0, 0, 0),
-         (0, 0, 1, 1, 0),
-         (1, 0, 1, 2, 0),
-         (0, 0, 0, 0, 1))
-
-         val AtAdNoSelfCooc = dense(
-         (0, 1, 0, 1, 0),
-         (1, 0, 0, 0, 0),
-         (0, 0, 0, 1, 0),
-         (1, 0, 1, 0, 0),
-         (0, 0, 0, 0, 0))
-
-    */
-
-    //item (1,0)
-    val numInteractionsWithAandB = 1L
-    val numInteractionsWithA = 1L
-    val numInteractionsWithB = 2L
-    val numInteractions = 6l
-
-    val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
-
-    assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
-  }
-
-  test("downsampling by number per row") {
-    val a = dense(
-        (1, 1, 1, 1, 0),
-        (1, 1, 1, 1, 1),
-        (0, 0, 0, 0, 1),
-        (1, 1, 0, 1, 0))
-    val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
-
-    val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
-    //count non-zero values, should be == 7
-    var numValues = 0
-    val m = downSampledDrm.collect
-    val it = m.iterator()
-    while (it.hasNext) {
-      val v = it.next().vector()
-      val nonZeroIt = v.nonZeroes().iterator()
-      while (nonZeroIt.hasNext) {
-        numValues += 1
-        nonZeroIt.next()
-      }
-    }
-
-    assert(numValues == 8) //Don't change the random seed or this may fail.
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
new file mode 100644
index 0000000..0b3b3eb
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.{MatrixOps, _}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+import org.apache.mahout.test.MahoutSuite
+import org.scalatest.FunSuite
+
+/* values 
+A =
+1	1	0	0	0
+0	0	1	1	0
+0	0	0	0	1
+1	0	0	1	0
+
+B =
+1	1	1	1	0
+1	1	1	1	0
+0	0	1	0	1
+1	1	0	1	0
+ */
+
+// todo: add tests for the IndexedDataset coccurrence methods
+
+class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
+
+  // correct cooccurrence with LLR
+  final val matrixLLRCoocAtAControl = dense(
+    (0.0,                1.7260924347106847, 0.0,                     0.0,                0.0),
+    (1.7260924347106847, 0.0,                0.0,                     0.0,                0.0),
+    (0.0,                0.0,                0.0,                     1.7260924347106847, 0.0),
+    (0.0,                0.0,                1.7260924347106847,      0.0,                0.0),
+    (0.0,                0.0,                0.0,                     0.0,                0.0))
+
+  // correct cross-cooccurrence with LLR
+  final val m = dense(
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+    (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+    (0.0,                0.0,                0.0,                0.0,                4.498681156950466))
+
+  final val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
+
+
+  test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
+    val a = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val b = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 0),
+        (0, 0, 1, 0, 1),
+        (1, 1, 0, 1, 0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+
+  }
+
+  test("cooccurrence [A'A], [B'A] double data using LLR") {
+    val a = dense(
+        (100000.0D, 1.0D,  0.0D,  0.0D,     0.0D),
+        (     0.0D, 0.0D, 10.0D,  1.0D,     0.0D),
+        (     0.0D, 0.0D,  0.0D,  0.0D,  1000.0D),
+        (     1.0D, 0.0D,  0.0D, 10.0D,     0.0D))
+
+    val b = dense(
+        (10000.0D, 100.0D,     1000.0D,      1.0D,   0.0D),
+        (   10.0D,   1.0D, 10000000.0D,     10.0D,   0.0D),
+        (    0.0D,   0.0D,     1000.0D,      0.0D, 100.0D),
+        (  100.0D,   1.0D,        0.0D, 100000.0D,   0.0D))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence [A'A], [B'A] integer data using LLR") {
+    val a = dense(
+        ( 1000,  10,       0,    0,   0),
+        (    0,   0,  -10000,   10,   0),
+        (    0,   0,       0,    0, 100),
+        (10000,   0,       0, 1000,   0))
+
+    val b = dense(
+        (  100, 1000, -10000, 10000,    0),
+        (10000, 1000,    100,    10,    0),
+        (    0,    0,     10,     0, -100),
+        (   10,  100,      0,  1000,    0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+   //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    //var cp = drmSelfCooc(0).checkpoint()
+    //cp.writeDRM("/tmp/cooc-spark/")//to get values written
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+    n should be < 1E-10
+  }
+
+  test("cooccurrence two matrices with different number of columns"){
+    val a = dense(
+      (1, 1, 0, 0, 0),
+      (0, 0, 1, 1, 0),
+      (0, 0, 0, 0, 1),
+      (1, 0, 0, 1, 0))
+
+    val b = dense(
+      (0, 1, 1, 0),
+      (1, 1, 1, 0),
+      (0, 0, 1, 0),
+      (1, 1, 0, 1))
+
+    val matrixLLRCoocBtANonSymmetric = dense(
+      (0.0,                1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.0,                0.6795961471815897, 0.0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+
+    //cooccurrence without LLR is just a A'B
+    //val inCoreAtB = a.transpose().times(b)
+    //val bp = 0
+  }
+
+  test("LLR calc") {
+    val A = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val AtA = A.transpose().times(A)
+
+    /* AtA is:
+      0  =>	{0:2.0,1:1.0,3:1.0}
+      1  =>	{0:1.0,1:1.0}
+      2  =>	{2:1.0,3:1.0}
+      3  =>	{0:1.0,2:1.0,3:2.0}
+      4  =>	{4:1.0}
+
+          val AtAd = dense(
+         (2, 1, 0, 1, 0),
+         (1, 1, 0, 0, 0),
+         (0, 0, 1, 1, 0),
+         (1, 0, 1, 2, 0),
+         (0, 0, 0, 0, 1))
+
+         val AtAdNoSelfCooc = dense(
+         (0, 1, 0, 1, 0),
+         (1, 0, 0, 0, 0),
+         (0, 0, 0, 1, 0),
+         (1, 0, 1, 0, 0),
+         (0, 0, 0, 0, 0))
+
+    */
+
+    //item (1,0)
+    val numInteractionsWithAandB = 1L
+    val numInteractionsWithA = 1L
+    val numInteractionsWithB = 2L
+    val numInteractions = 6l
+
+    val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+    assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
+  }
+
+  test("downsampling by number per row") {
+    val a = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 1),
+        (0, 0, 0, 0, 1),
+        (1, 1, 0, 1, 0))
+    val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
+
+    val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
+    //count non-zero values, should be == 7
+    var numValues = 0
+    val m = downSampledDrm.collect
+    val it = m.iterator()
+    while (it.hasNext) {
+      val v = it.next().vector()
+      val nonZeroIt = v.nonZeroes().iterator()
+      while (nonZeroIt.hasNext) {
+        numValues += 1
+        nonZeroIt.next()
+      }
+    }
+
+    assert(numValues == 8) //Don't change the random seed or this may fail.
+  }
+}


[2/2] git commit: NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59

Posted by pa...@apache.org.
NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/666d314f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/666d314f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/666d314f

Branch: refs/heads/master
Commit: 666d314fb8d9f466a5496a143d73d55036aea619
Parents: 4e6577d
Author: pferrel <pa...@occamsmachete.com>
Authored: Tue Oct 14 14:45:32 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Tue Oct 14 14:45:32 2014 -0700

----------------------------------------------------------------------
 .../apache/mahout/h2obindings/H2OEngine.scala   |  44 +++
 .../h2obindings/drm/CheckpointedDrmH2O.scala    |  13 +
 math-scala/pom.xml                              |   6 +
 .../apache/mahout/drivers/MahoutDriver.scala    |  43 +++
 .../mahout/drivers/MahoutOptionParser.scala     | 220 +++++++++++++++
 .../mahout/math/cf/SimilarityAnalysis.scala     |  56 +++-
 .../mahout/math/drm/CheckpointedDrm.scala       |   3 +
 .../mahout/math/drm/DistributedEngine.scala     |  25 ++
 .../org/apache/mahout/math/drm/package.scala    |  26 +-
 .../math/indexeddataset/IndexedDataset.scala    |  64 +++++
 .../math/indexeddataset/ReaderWriter.scala      |  68 +++++
 .../mahout/math/indexeddataset/Schema.scala     | 102 +++++++
 .../apache/mahout/common/HDFSPathSearch.scala   |  78 ++++++
 .../apache/mahout/drivers/FileSysUtils.scala    |  76 ------
 .../apache/mahout/drivers/IndexedDataset.scala  |  80 ------
 .../mahout/drivers/ItemSimilarityDriver.scala   |  90 +++----
 .../apache/mahout/drivers/MahoutDriver.scala    | 104 -------
 .../mahout/drivers/MahoutOptionParser.scala     | 214 ---------------
 .../mahout/drivers/MahoutSparkDriver.scala      |  87 ++++++
 .../apache/mahout/drivers/ReaderWriter.scala    |  66 -----
 .../mahout/drivers/RowSimilarityDriver.scala    |  41 +--
 .../org/apache/mahout/drivers/Schema.scala      |  98 -------
 .../drivers/TextDelimitedReaderWriter.scala     |  68 ++---
 .../mahout/sparkbindings/SparkEngine.scala      |  38 ++-
 .../drm/CheckpointedDrmSpark.scala              |  12 +
 .../indexeddataset/IndexedDatasetSpark.scala    |  53 ++++
 .../apache/mahout/sparkbindings/package.scala   |   4 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   | 267 ------------------
 .../mahout/cf/SimilarityAnalysisSuite.scala     | 269 +++++++++++++++++++
 29 files changed, 1273 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 28214c6..99bc3ba 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.h2obindings
 
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema}
+
 import scala.reflect._
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
@@ -112,4 +115,45 @@ object H2OEngine extends DistributedEngine {
   }
 
   implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
+
+  /** stub class not implemented in H2O */
+  abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+    extends IndexedDataset {}
+
+    /**
+   * reads an IndexedDatasetH2O from default text delimited files
+   * @todo unimplemented
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetH2O = {
+    // should log a warning when this is built but no logger here, can an H2O contributor help with this
+    println("Warning: unimplemented indexedDatasetDFSReadElements." )
+    throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.")
+    null.asInstanceOf[IndexedDatasetH2O]
+  }
+
+  /**
+   * reads an IndexedDatasetH2O from default text delimited files
+   * @todo unimplemented
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   * @return
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDatasetH2O = {
+    // should log a warning when this is built but no logger here, can an H2O contributor help with this
+    println("Warning: unimplemented indexedDatasetDFSReadElements." )
+    throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.")
+    null.asInstanceOf[IndexedDatasetH2O]
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index c06455a..6d7628e 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -53,4 +53,17 @@ class CheckpointedDrmH2O[K: ClassTag](
   def canHaveMissingRows: Boolean = false
 
   protected[mahout] def partitioningTag: Long = h2odrm.frame.anyVec.group.hashCode
+
+  /** stub need to make IndexedDataset core but since drmWrap is not in H2O left for someone else */
+  override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+    throw new UnsupportedOperationException("CheckpointedDrmH2O#newRowCardinality is not implemented.")
+    /* this is the Spark impl
+    assert(n > -1)
+    assert( n >= nrow)
+    val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+    newCheckpointedDrm
+    */
+    this
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index b2fea6b..66309d6 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -172,6 +172,12 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+      
+    <dependency>
+      <groupId>com.github.scopt</groupId>
+      <artifactId>scopt_2.10</artifactId>
+      <version>3.2.0</version>
+    </dependency>
 
     <!-- scala stuff -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..8c1f8cf
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Extended by a platform specific version of this class to create a Mahout CLI driver.
+ */
+abstract class MahoutDriver {
+
+
+  implicit protected var mc: DistributedContext = _
+  protected var parser: MahoutOptionParser = _
+
+  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+
+  /** Override (optionally) for special cleanup */
+  protected def stop: Unit = {
+    if (!_useExistingContext) mc.close
+  }
+
+  /** This is where you do the work, call start first, then before exiting call stop */
+  protected def process: Unit
+
+  /** Parse command line and call process */
+  def main(args: Array[String]): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..c6968b8
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+import scala.collection.immutable
+
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+  * keep both standarized.
+  * @param programName Name displayed in help message, the name by which the driver is invoked.
+  * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+  * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so the can override and previous options!
+  var opts = Map.empty[String, Any]
+
+  override def showUsageOnError = true
+
+  def parseIOOptions(numInputs: Int = 1) = {
+    opts = opts ++ MahoutOptionParser.FileIOOptions
+    note("Input, output options")
+    opt[String]('i', "input") required() action { (x, options) =>
+      options + ("input" -> x)
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+    if (numInputs == 2) {
+      opt[String]("input2") abbr ("i2") action { (x, options) =>
+        options + ("input2" -> x)
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+    }
+
+    opt[String]('o', "output") required() action { (x, options) =>
+      // todo: check to see if HDFS allows MS-Windows backslashes locally?
+      if (x.endsWith("/")) {
+        options + ("output" -> x)
+      } else {
+        options + ("output" -> (x + "/"))
+      }
+    } text ("Path for output, any local or HDFS supported URI (required)")
+
+  }
+
+  def parseSparkOptions = {
+    opts = opts ++ MahoutOptionParser.SparkOptions
+    note("\nSpark config options:")
+
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+      options + ("master" -> x)
+    }
+
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+      options + ("sparkExecutorMem" -> x)
+    }
+
+  }
+
+  def parseGenericOptions = {
+    opts = opts ++ MahoutOptionParser.GenericOptions
+    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+      options + ("randomSeed" -> x)
+    } validate { x =>
+      if (x > 0) success else failure("Option --randomSeed must be > 0")
+    }
+
+    //output both input DRMs
+    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+      options + ("writeAllDatasets" -> true)
+    }//Hidden option, though a user might want this.
+  }
+
+  def parseElementInputSchemaOptions{
+    //Input text file schema--not driver specific but input data specific, elements input,
+    // not drms
+    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
+    note("\nInput text file schema options:")
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+      options + ("inDelim" -> x)
+    }
+
+    opt[String]("filter1") abbr ("f1") action { (x, options) =>
+      options + ("filter1" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+    opt[String]("filter2") abbr ("f2") action { (x, options) =>
+      options + ("filter2" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+    opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
+      options + ("rowIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    }
+
+    opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
+      options + ("itemIDColumn" -> x)
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    }
+
+    opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
+      options + ("filterColumn" -> x)
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+    }
+
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+    //check for column consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
+        || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
+        || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
+        failure("The row, item, and filter positions must be unique.") else success
+    }
+
+    //check for filter consistency
+    checkConfig { options: Map[String, Any] =>
+      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+        failure ("If using filters they must be unique.") else success
+    }
+
+  }
+
+  def parseFileDiscoveryOptions = {
+    //File finding strategy--not driver specific
+    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
+    note("\nFile discovery options:")
+    opt[Unit]('r', "recursive") action { (_, options) =>
+      options + ("recursive" -> true)
+    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+      options + ("filenamePattern" -> x)
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+  }
+
+  def parseDrmFormatOptions = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+    note("\nOutput text file schema options:")
+    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+      options + ("rowKeyDelim" -> x)
+    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+      options + ("columnIdStrengthDelim" -> x)
+    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+      options + ("elementDelim" -> x)
+    } text ("Separates vector element values in the values list (optional). Default: \" \"")
+
+    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+      options + ("omitStrength" -> true)
+    } text ("Do not write the strength to the output files (optional), Default: false.")
+    note("This option is used to output indexable data for creating a search engine recommender.")
+
+    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+  }
+
+}
+
+/** Companion object defines default option groups for reference in any driver that needs them.
+  * @note not all options are platform neutral so other platforms can add default options here if desired */
+object MahoutOptionParser {
+
+  // set up the various default option groups
+  final val GenericOptions = immutable.HashMap[String, Any](
+    "randomSeed" -> System.currentTimeMillis().toInt,
+    "writeAllDatasets" -> false)
+
+  final val SparkOptions = immutable.HashMap[String, Any](
+    "master" -> "local",
+    "sparkExecutorMem" -> "",
+    "appName" -> "Generic Spark App, Change this.")
+
+  final val FileIOOptions = immutable.HashMap[String, Any](
+    "input" -> null.asInstanceOf[String],
+    "input2" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String])
+
+  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
+    "filenamePattern" -> "^part-.*")
+
+  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
+    "rowIDColumn" -> 0,
+    "itemIDColumn" -> 1,
+    "filterColumn" -> -1,
+    "filter1" -> null.asInstanceOf[String],
+    "filter2" -> null.asInstanceOf[String],
+    "inDelim" -> "[,\t ]")
+
+  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "elementDelim" -> " ",
+    "omitStrength" -> false)
+}
+
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index 90d7559..5718304 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -18,6 +18,7 @@
 package org.apache.mahout.math.cf
 
 import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.IndexedDataset
 import scalabindings._
 import RLikeOps._
 import drm._
@@ -49,7 +50,7 @@ object SimilarityAnalysis extends Serializable {
     * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
     * @param maxNumInteractions max number of interactions after downsampling, default: 500
     * @return
-    * */
+    */
   def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
                     maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
 
@@ -99,13 +100,39 @@ object SimilarityAnalysis extends Serializable {
     indicatorMatrices
   }
 
+  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+    * and returns a list of indicator and cross-indicator matrices
+    * Somewhat easier to use method, which handles the ID dictionaries correctly
+    * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+    * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+    * @param maxInterestingItemsPerThing max similarities per items
+    * @param maxNumInteractions max number of input items per item
+    * @return
+    */
+  def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
+      randomSeed: Int = 0xdeadbeef,
+      maxInterestingItemsPerThing: Int = 50,
+      maxNumInteractions: Int = 500):
+    List[IndexedDataset] = {
+    val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
+    val primaryDrm = drms(0)
+    val secondaryDrms = drms.drop(1)
+    val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
+      maxNumInteractions, secondaryDrms)
+    val retIDSs = coocMatrices.iterator.zipWithIndex.map {
+      case( drm, i ) =>
+        indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
+    }
+    retIDSs.toList
+  }
+
   /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
     * @param drmARaw Primary interaction matrix
     * @param randomSeed when kept to a constant will make repeatable downsampling
     * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
     * @param maxNumInteractions max number of interactions after downsampling, default: 500
     * @return
-    * */
+    */
   def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
                     maxNumInteractions: Int = 500): DrmLike[Int] = {
 
@@ -130,10 +157,27 @@ object SimilarityAnalysis extends Serializable {
     drmSimilaritiesAAt
   }
 
-  /**
-   * Compute loglikelihood ratio
-   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-   **/
+  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+    * Uses IndexedDatasets, which handle external ID dictionaries properly
+    * @param indexedDataset compare each row to every other
+    * @param randomSeed  use default to make repeatable, otherwise pass in system time or some randomizing seed
+    * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+    * @param maxObservationsPerRow max number of input elements to use
+    * @return
+    */
+  def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
+      maxInterestingSimilaritiesPerRow: Int = 50,
+      maxObservationsPerRow: Int = 500):
+    IndexedDataset = {
+    val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
+      maxObservationsPerRow)
+    indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
+  }
+
+   /**
+     * Compute loglikelihood ratio
+     * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+     */
   def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
     numInteractionsWithAandB: Long, numInteractions: Long) = {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 082e5b9..7f97481 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -41,4 +41,7 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
   def keyClassTag: ClassTag[K]
 
 
+  /** changes the number of rows without touching the underlying data */
+  def newRowCardinality(n: Int): CheckpointedDrm[K]
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index eaf5aeb..dd5b101 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.math.drm
 
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+
 import scala.reflect.ClassTag
 import logical._
 import org.apache.mahout.math._
@@ -85,6 +88,28 @@ trait DistributedEngine {
   /** Creates empty DRM with non-trivial height */
   def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
       (implicit sc: DistributedContext): CheckpointedDrm[Long]
+  /**
+   * Load IndexedDataset from text delimited format.
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
+  /**
+   * Load IndexedDataset from text delimited format, one element per line
+   * @param src comma delimited URIs to read from
+   * @param schema defines format of file(s)
+   */
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit sc: DistributedContext):
+    IndexedDataset
+
 }
 
 object DistributedEngine {

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index b787ec0..3afbecb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -17,10 +17,13 @@
 
 package org.apache.mahout.math
 
-import scala.reflect.ClassTag
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema}
+import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+
+import scala.reflect.ClassTag
 
 package object drm {
 
@@ -114,3 +117,20 @@ package object drm {
   }
 
 }
+
+package object indexeddataset {
+  /** Load IndexedDataset from text delimited files */
+  def indexedDatasetDFSRead(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit ctx: DistributedContext):
+    IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
+
+  def indexedDatasetDFSReadElements(src: String,
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+      (implicit ctx: DistributedContext):
+    IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
new file mode 100644
index 0000000..c7eb2cb
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+
+/**
+  * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
+  * ID/label translation dictionaries.
+  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+  * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
+  * used internal to Mahout core code.
+  *
+  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+  *       to be not created when not needed.
+  */
+
+trait IndexedDataset {
+  val matrix: CheckpointedDrm[Int]
+  val rowIDs: BiMap[String,Int]
+  val columnIDs: BiMap[String,Int]
+
+  /**
+   * Write a text delimited file(s) with the row and column IDs from dictionaries.
+   * @param dest
+   * @param schema
+   */
+  def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
+
+  /** Factory method, creates the extending class */
+  def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+    IndexedDataset
+
+  /**
+   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+   * No changes are made to the underlying drm.
+   * @param n number to use for new row cardinality, should be larger than current
+   * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
+   *       results.
+   */
+  def newRowCardinality(n: Int): IndexedDataset = {
+    // n is validated in matrix
+    this.create(matrix.newRowCardinality(n), rowIDs, columnIDs)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
new file mode 100644
index 0000000..cf429f5
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
+  * which also defines the type to be read.
+  * @tparam T type of object to read.
+  */
+trait Reader[T]{
+
+  val mc: DistributedContext
+  val readSchema: Schema
+
+  protected def elementReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  protected def drmReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  def readElementsFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    elementReader(mc, readSchema, source, existingRowIDs)
+
+  def readDRMFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    drmReader(mc, readSchema, source, existingRowIDs)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+  * which also defines the type to be written.
+  * @tparam T type of object to write.
+  */
+trait Writer[T]{
+
+  val mc: DistributedContext
+  val sort: Boolean
+  val writeSchema: Schema
+
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
new file mode 100644
index 0000000..557b419
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for mutable.HashMap[String, Any]
+  *
+  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+  */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+  // note: this require a mutable HashMap, do we care?
+  this ++= params
+
+  /** Constructor for copying an existing Schema
+    *
+    * @param schemaToClone return a copy of this Schema
+    */
+  def this(schemaToClone: Schema){
+    this()
+    this ++= schemaToClone
+  }
+}
+
+/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+  * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+  * , which can be used to create a Mahout DRM for DSL ops.
+  */
+
+
+/** Simple default Schema for typical text delimited element file input
+  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+  * <comma, tab, or space>here may be other ignored text...)
+  */
+final object DefaultIndexedDatasetElementReadSchema extends Schema(
+  "delim" -> "[,\t ]", //comma, tab or space
+  "filter" -> "",
+  "rowIDColumn" -> 0,
+  "columnIDPosition" -> 1,
+  "filterColumn" -> -1)
+
+/** Default Schema for text delimited drm file output
+  * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+  */
+final object DefaultIndexedDatasetWriteSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> false)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
+  * This tells the reader to input text lines of the form:
+  * (rowID<tab>columnID1:score1,columnID2:score2,...)
+  */
+final object DefaultIndexedDatasetReadSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ")
+
+/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file  where
+  * the score of any element is ignored,
+  * all non-zeros are replaced with 1.
+  * This tells the reader to input DRM lines of the form
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+  * Alternatively the format can be
+  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+  * output format for [[IndexedDatasetWriteBooleanSchema]]
+  */
+final object IndexedDatasetReadBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> true)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
+  * the score of a element is omitted.
+  * The presence of a element means the score = 1, the absence means a score of 0.
+  * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
+  * (rowID<tab>columnID1<space>columnID2...)
+  */
+final object IndexedDatasetWriteBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> true)
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
new file mode 100644
index 0000000..42bf697
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
+
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+  val conf = new Configuration()
+  val fs = FileSystem.get(conf)
+
+  /** Returns a string of comma delimited URIs matching the filePattern
+    * When pattern matching dirs are never returned, only traversed. */
+  def uris: String = {
+    if (!filePattern.isEmpty){ // have file pattern so
+    val pathURIs = pathURI.split(",")
+      var files = ""
+      for ( uri <- pathURIs ){
+        files = findFiles(uri, filePattern, files)
+      }
+      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+      files
+    }else{
+      pathURI
+    }
+  }
+
+  /** Find matching files in the dir, recursively call self when another directory is found
+    * Only files are matched, directories are traversed but never return a match */
+  private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+    val seed = fs.getFileStatus(new Path(dir))
+    var f: String = files
+
+    if (seed.isDir) {
+      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+      for (fileStatus <- fileStatuses) {
+        if (fileStatus.getPath().getName().matches(filePattern)
+          && !fileStatus.isDir) {
+          // found a file
+          if (fileStatus.getLen() != 0) {
+            // file is not empty
+            f = f + fileStatus.getPath.toUri.toString + ","
+          }
+        } else if (fileStatus.isDir && recursive) {
+          f = findFiles(fileStatus.getPath.toString, filePattern, f)
+        }
+      }
+    }else{ f = dir }// was a filename not dir
+    f
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
deleted file mode 100644
index f48e9ed..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
-
-/**
- * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
- * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
- * @param filePattern regex that must match the entire filename to have the file returned
- * @param recursive true traverses the filesystem recursively, default = false
- */
-
-case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
-
-  val conf = new Configuration()
-  val fs = FileSystem.get(conf)
-
-/** Returns a string of comma delimited URIs matching the filePattern
-  * When pattern matching dirs are never returned, only traversed. */
-  def uris :String = {
-    if (!filePattern.isEmpty){ // have file pattern so
-    val pathURIs = pathURI.split(",")
-      var files = ""
-      for ( uri <- pathURIs ){
-        files = findFiles(uri, filePattern, files)
-      }
-      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
-      files
-    }else{
-      pathURI
-    }
-  }
-
-/** Find matching files in the dir, recursively call self when another directory is found
-  * Only files are matched, directories are traversed but never return a match */
-  private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
-    val seed = fs.getFileStatus(new Path(dir))
-    var f :String = files
-
-    if (seed.isDir) {
-      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
-      for (fileStatus <- fileStatuses) {
-        if (fileStatus.getPath().getName().matches(filePattern)
-          && !fileStatus.isDir) {
-          // found a file
-          if (fileStatus.getLen() != 0) {
-            // file is not empty
-            f = f + fileStatus.getPath.toUri.toString + ","
-          }
-        } else if (fileStatus.isDir && recursive) {
-          f = findFiles(fileStatus.getPath.toString, filePattern, f)
-        }
-      }
-    }else{ f = dir }// was a filename not dir
-    f
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
deleted file mode 100644
index 99f98f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.apache.mahout.sparkbindings._
-
-/**
-  * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
-  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
-  * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
-  * used internal to Mahout Core code.
-  *
-  * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
-  * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
-  * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
-  *
-  * @param matrix  DrmLike[Int], representing the distributed matrix storing the actual data.
-  * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
-  *                  and from the ordinal Mahout Int ID. This one holds row labels
-  * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
-  *                  ID to and from the ordinal Mahout Int ID. This one holds column labels
-  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
-  *       to be not created when not needed.
-  */
-
-case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
-
-  // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
-  // learn this afterwards
-
-  /**
-   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
-   * No physical changes are made to the underlying drm.
-   * @param n number to use for row carnindality, should be larger than current
-   * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
-   *       results.
-   */
-  def newRowCardinality(n: Int): IndexedDataset = {
-    assert(n > -1)
-    assert( n >= matrix.nrow)
-    val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
-    val ncol = matrix.ncol
-    val newMatrix = drmWrap[Int](drmRdd, n, ncol)
-    new IndexedDataset(newMatrix, rowIDs, columnIDs)
-  }
-
-}
-
-/**
-  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
-  * constructor for
-  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
-  * [[org.apache.mahout.drivers.Reader]]
-  * {{{
-  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
-  * }}}
-  */
-
-object IndexedDataset {
-  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix,  id2.rowIDs, id2.columnIDs)
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 0b8ded6..42e9d81 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,15 +17,19 @@
 
 package org.apache.mahout.drivers
 
+import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSReadElements}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
  * Reads text lines
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
- * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
  * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
@@ -40,17 +44,16 @@ import scala.collection.immutable.HashMap
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option.
  */
-object ItemSimilarityDriver extends MahoutDriver {
+object ItemSimilarityDriver extends MahoutSparkDriver {
   // define only the options specific to ItemSimilarity
   private final val ItemSimilarityOptions = HashMap[String, Any](
     "maxPrefs" -> 500,
     "maxSimilaritiesPerItem" -> 100,
     "appName" -> "ItemSimilarityDriver")
 
-  private var reader1: TextDelimitedIndexedDatasetReader = _
-  private var reader2: TextDelimitedIndexedDatasetReader = _
-  private var writer: TextDelimitedIndexedDatasetWriter = _
   private var writeSchema: Schema = _
+  private var readSchema1: Schema = _
+  private var readSchema2: Schema = _
 
   /**
    * @param args  Command line args, if empty a help message is printed.
@@ -119,27 +122,27 @@ object ItemSimilarityDriver extends MahoutDriver {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
     // MahoutKryoRegistrator, it should be added to the register list here so it
-    // will be only spcific to this job.
+    // will be only specific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+      .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
+
+    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+    //else leave as set in Spark config
 
     super.start(masterUrl, appName)
 
-    val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+    readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
       "filter" -> parser.opts("filter1").asInstanceOf[String],
       "rowIDColumn" -> parser.opts("rowIDColumn").asInstanceOf[Int],
       "columnIDPosition" -> parser.opts("itemIDColumn").asInstanceOf[Int],
       "filterColumn" -> parser.opts("filterColumn").asInstanceOf[Int])
 
-    reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
-
     if ((parser.opts("filterColumn").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
       || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
       // only need to change the filter used compared to readSchema1
-      val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
+      readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
 
-      reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
     }
 
     writeSchema = new Schema(
@@ -147,36 +150,34 @@ object ItemSimilarityDriver extends MahoutDriver {
       "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
       "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
       "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-
-    writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
-
-  }
+    }
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+    val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
     val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
-    else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+    else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
       parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       Array()
     } else {
 
-      val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
-      if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
-        parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+      val datasetA = indexedDatasetDFSReadElements(inFiles,readSchema1)
+      if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+        datasetA.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions",
+          schema = writeSchema)
 
-      // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+      // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
       // Here we assume there is one row ID space for all interactions. To do this we calculate the
       // row cardinality only after reading in A and B (or potentially C...) We then adjust the
       // cardinality so all match, which is required for the math to work.
       // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
-      // be supported (and are at least on Spark) or the row cardinality fix will not work.
+      // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
       val datasetB = if (!inFiles2.isEmpty) {
         // get cross-cooccurrence interactions from separate files
-        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+        val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs)
 
         datasetB
 
@@ -184,12 +185,12 @@ object ItemSimilarityDriver extends MahoutDriver {
         && parser.opts("filter2").asInstanceOf[String] != null) {
 
         // get cross-cooccurrences interactions by using two filters on a single set of files
-        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+        val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs)
 
         datasetB
 
       } else {
-        null.asInstanceOf[IndexedDataset]
+        null.asInstanceOf[IndexedDatasetSpark]
       }
       if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
       // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
@@ -203,8 +204,9 @@ object ItemSimilarityDriver extends MahoutDriver {
         val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
         else datasetB // this guarantees matching cardinality
 
-        if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
-
+        if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+          datasetB.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/secondary-interactions",
+            schema = writeSchema)
         Array(returnedA, returnedB)
       } else Array(datasetA)
     }
@@ -214,31 +216,13 @@ object ItemSimilarityDriver extends MahoutDriver {
     start()
 
     val indexedDatasets = readIndexedDatasets
+    val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
+      parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
 
     // todo: allow more than one cross-similarity matrix?
-    val indicatorMatrices = {
-      if (indexedDatasets.length > 1) {
-        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
-          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
-          Array(indexedDatasets(1).matrix))
-      } else {
-        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
-          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
-      }
-    }
-
-    // an alternative is to create a version of IndexedDataset that knows how to write itself
-    val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
-      indexedDatasets(0).columnIDs, writeSchema)
-    selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
-
-    // todo: would be nice to support more than one cross-similarity indicator
-    if (indexedDatasets.length > 1) {
-
-      val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
-      writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
-
-    }
+    idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+    if(idss.length > 1)
+      idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
 
     stop
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
deleted file mode 100644
index 6ea7c8b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-
-import scala.collection.immutable
-
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a Map of options for the command line parser. The following template may help:
-  * {{{
-  * object SomeDriver extends MahoutDriver {
-  *
-  *   // define only the options specific to this driver, inherit the generic ones
-  *   private final val SomeOptions = HashMap[String, Any](
-  *       "maxThings" -> 500,
-  *       "minThings" -> 100,
-  *       "appName" -> "SomeDriver")
-  *
-  *   override def main(args: Array[String]): Unit = {
-  *
-  *
-  *     val parser = new MahoutOptionParser(programName = "shortname") {
-  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
-  *
-  *       // Input output options, non-driver specific
-  *       parseIOOptions
-  *
-  *       // Algorithm specific options
-  *       // Add in the new options
-  *       opts = opts ++ SomeOptions
-  *       note("\nAlgorithm control options:")
-  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
-  *         options + ("maxThings" -> x) ...
-  *     }
-  *     parser.parse(args, parser.opts) map { opts =>
-  *       parser.opts = opts
-  *       process
-  *     }
-  *   }
-  *
-  *   override def process: Unit = {
-  *     start()
-  *     // do the work here
-  *     stop
-  *   }
-  *
-  * }}}
-  */
-abstract class MahoutDriver {
-
-
-  implicit protected var mc: DistributedContext = _
-  implicit protected var sparkConf = new SparkConf()
-  protected var parser: MahoutOptionParser = _
-
-  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
-
-  /** Creates a Spark context to run the job inside.
-    * Override to set the SparkConf values specific to the job,
-    * these must be set before the context is created.
-    * @param masterUrl Spark master URL
-    * @param appName  Name to display in Spark UI
-    * */
-  protected def start(masterUrl: String, appName: String) : Unit = {
-    if (!_useExistingContext) {
-      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
-    }
-  }
-
-  /** Override (optionally) for special cleanup */
-  protected def stop: Unit = {
-    if (!_useExistingContext) mc.close
-  }
-
-  /** This is where you do the work, call start first, then before exiting call stop */
-  protected def process: Unit
-
-  /** Parse command line and call process */
-  def main(args: Array[String]): Unit
-
-  def useContext(context: DistributedContext): Unit = {
-    _useExistingContext = true
-    mc = context
-    sparkConf = mc.getConf
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
deleted file mode 100644
index ad7a76b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.drivers
-
-import scopt.OptionParser
-import scala.collection.immutable
-
-/** Companion object defines default option groups for reference in any driver that needs them */
-object MahoutOptionParser {
-  // set up the various default option groups
-  final val GenericOptions = immutable.HashMap[String, Any](
-    "randomSeed" -> System.currentTimeMillis().toInt,
-    "writeAllDatasets" -> false)
-
-  final val SparkOptions = immutable.HashMap[String, Any](
-    "master" -> "local",
-    "sparkExecutorMem" -> "2g",
-    "appName" -> "Generic Spark App, Change this.")
-
-  final val FileIOOptions = immutable.HashMap[String, Any](
-    "input" -> null.asInstanceOf[String],
-    "input2" -> null.asInstanceOf[String],
-    "output" -> null.asInstanceOf[String])
-
-  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
-    "recursive" -> false,
-    "filenamePattern" -> "^part-.*")
-
-  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
-    "rowIDColumn" -> 0,
-    "itemIDColumn" -> 1,
-    "filterColumn" -> -1,
-    "filter1" -> null.asInstanceOf[String],
-    "filter2" -> null.asInstanceOf[String],
-    "inDelim" -> "[,\t ]")
-
-  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
-    "rowKeyDelim" -> "\t",
-    "columnIdStrengthDelim" -> ":",
-    "elementDelim" -> " ",
-    "omitStrength" -> false)
-}
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
-  * keep both standarized.
-  * @param programName Name displayed in help message, the name by which the driver is invoked.
-  * */
-class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
-
-  // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so the can override and previous options!
-  var opts = Map.empty[String, Any]
-
-  override def showUsageOnError = true
-
-  def parseIOOptions(numInputs: Int = 1) = {
-    opts = opts ++ MahoutOptionParser.FileIOOptions
-    note("Input, output options")
-    opt[String]('i', "input") required() action { (x, options) =>
-      options + ("input" -> x)
-    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
-    if (numInputs == 2) {
-      opt[String]("input2") abbr ("i2") action { (x, options) =>
-        options + ("input2" -> x)
-      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
-    }
-
-    opt[String]('o', "output") required() action { (x, options) =>
-      // todo: check to see if HDFS allows MS-Windows backslashes locally?
-      if (x.endsWith("/")) {
-        options + ("output" -> x)
-      } else {
-        options + ("output" -> (x + "/"))
-      }
-    } text ("Path for output, any local or HDFS supported URI (required)")
-
-  }
-
-  def parseSparkOptions = {
-    opts = opts ++ MahoutOptionParser.SparkOptions
-    note("\nSpark config options:")
-
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-      options + ("master" -> x)
-    }
-
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
-      options + ("sparkExecutorMem" -> x)
-    }
-
-  }
-
-  def parseGenericOptions = {
-    opts = opts ++ MahoutOptionParser.GenericOptions
-    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
-      options + ("randomSeed" -> x)
-    } validate { x =>
-      if (x > 0) success else failure("Option --randomSeed must be > 0")
-    }
-
-    //output both input DRMs
-    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
-      options + ("writeAllDatasets" -> true)
-    }//Hidden option, though a user might want this.
-  }
-
-  def parseElementInputSchemaOptions{
-    //Input text file schema--not driver specific but input data specific, elements input,
-    // not drms
-    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
-    note("\nInput text file schema options:")
-    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
-      options + ("inDelim" -> x)
-    }
-
-    opt[String]("filter1") abbr ("f1") action { (x, options) =>
-      options + ("filter1" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
-    opt[String]("filter2") abbr ("f2") action { (x, options) =>
-      options + ("filter2" -> x)
-    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
-
-    opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
-      options + ("rowIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
-    }
-
-    opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
-      options + ("itemIDColumn" -> x)
-    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
-    }
-
-    opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
-      options + ("filterColumn" -> x)
-    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
-      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
-    }
-
-    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
-
-    checkConfig { options: Map[String, Any] =>
-      if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
-        || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
-        || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
-        failure("The row, item, and filter positions must be unique.") else success
-    }
-
-    //check for option consistency, probably driver specific
-    checkConfig { options: Map[String, Any] =>
-      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
-        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
-        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
-        failure ("If using filters they must be unique.") else success
-    }
-
-  }
-
-  def parseFileDiscoveryOptions = {
-    //File finding strategy--not driver specific
-    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
-    note("\nFile discovery options:")
-    opt[Unit]('r', "recursive") action { (_, options) =>
-      options + ("recursive" -> true)
-    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
-
-    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
-      options + ("filenamePattern" -> x)
-    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
-
-  }
-
-  def parseDrmFormatOptions = {
-    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
-    note("\nOutput text file schema options:")
-    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
-      options + ("rowKeyDelim" -> x)
-    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
-    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
-      options + ("columnIdStrengthDelim" -> x)
-    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
-    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
-      options + ("elementDelim" -> x)
-    } text ("Separates vector element values in the values list (optional). Default: \" \"")
-
-    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
-      options + ("omitStrength" -> true)
-    } text ("Do not write the strength to the output files (optional), Default: false.")
-    note("This option is used to output indexable data for creating a search engine recommender.")
-
-    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
new file mode 100644
index 0000000..5a4254b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+  * Also define a Map of options for the command line parser. The following template may help:
+  * {{{
+  * object SomeDriver extends MahoutDriver {
+  *
+  *   // define only the options specific to this driver, inherit the generic ones
+  *   private final val SomeOptions = HashMap[String, Any](
+  *       "maxThings" -> 500,
+  *       "minThings" -> 100,
+  *       "appName" -> "SomeDriver")
+  *
+  *   override def main(args: Array[String]): Unit = {
+  *
+  *
+  *     val parser = new MahoutOptionParser(programName = "shortname") {
+  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
+  *
+  *       // Input output options, non-driver specific
+  *       parseIOOptions
+  *
+  *       // Algorithm specific options
+  *       // Add in the new options
+  *       opts = opts ++ SomeOptions
+  *       note("\nAlgorithm control options:")
+  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+  *         options + ("maxThings" -> x) ...
+  *     }
+  *     parser.parse(args, parser.opts) map { opts =>
+  *       parser.opts = opts
+  *       process
+  *     }
+  *   }
+  *
+  *   override def process: Unit = {
+  *     start()
+  *     // do the work here
+  *     stop
+  *   }
+  *
+  * }}}
+  */
+abstract class MahoutSparkDriver extends MahoutDriver {
+
+
+  implicit protected var sparkConf = new SparkConf()
+
+  /** Creates a Spark context to run the job inside.
+    * Override to set the SparkConf values specific to the job,
+    * these must be set before the context is created.
+    * @param masterUrl Spark master URL
+    * @param appName  Name to display in Spark UI
+    * */
+  protected def start(masterUrl: String, appName: String) : Unit = {
+    if (!_useExistingContext) {
+      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+    }
+  }
+
+  def useContext(context: DistributedContext): Unit = {
+    _useExistingContext = true
+    mc = context
+    sparkConf = mc.getConf
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
deleted file mode 100644
index 6351e45..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.{HashBiMap, BiMap}
-import org.apache.mahout.math.drm.DistributedContext
-
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
-  * @tparam T type of object read.
-  */
-trait Reader[T]{
-
-  val mc: DistributedContext
-  val readSchema: Schema
-
-  protected def elementReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int]): T
-
-  protected def drmReader(
-      mc: DistributedContext,
-      readSchema: Schema,
-      source: String,
-      existingRowIDs: BiMap[String, Int]): T
-
-  def readElementsFrom(
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    elementReader(mc, readSchema, source, existingRowIDs)
-
-  def readDRMFrom(
-      source: String,
-      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    drmReader(mc, readSchema, source, existingRowIDs)
-}
-
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
-  * @tparam T type of object to write.
-  */
-trait Writer[T]{
-
-  val mc: DistributedContext
-  val sort: Boolean
-  val writeSchema: Schema
-
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
-
-  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
-}