You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/08 20:54:44 UTC

[5/6] git commit: MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct

MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a8097403
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a8097403
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a8097403

Branch: refs/heads/spark-1.0.x
Commit: a80974037853c5227f9e5ef1c384a1fca134746e
Parents: 00c0149
Author: pferrel <pa...@occamsmachete.com>
Authored: Wed Aug 6 16:28:37 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Wed Aug 6 16:28:37 2014 -0700

----------------------------------------------------------------------
 .../mahout/math/cf/CooccurrenceAnalysis.scala   | 220 ++++++++++
 .../apache/mahout/cf/CooccurrenceAnalysis.scala | 218 ----------
 .../apache/mahout/drivers/IndexedDataset.scala  |  25 +-
 .../mahout/drivers/ItemSimilarityDriver.scala   | 293 +++++--------
 .../apache/mahout/drivers/MahoutDriver.scala    |  28 +-
 .../mahout/drivers/MahoutOptionParser.scala     | 185 +++++++-
 .../apache/mahout/drivers/ReaderWriter.scala    |  30 +-
 .../org/apache/mahout/drivers/Schema.scala      |  54 ++-
 .../drivers/TextDelimitedReaderWriter.scala     | 107 +++--
 .../drm/CheckpointedDrmSpark.scala              |   1 -
 .../io/MahoutKryoRegistrator.scala              |   6 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   |  49 ++-
 .../drivers/ItemSimilarityDriverSuite.scala     | 422 +++++++++++++++++--
 13 files changed, 1114 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..181b729
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+      bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    var indicatorMatrices = List(drmIndicatorsAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+      // Compute cross-co-occurrence matrix B'A
+      // pferrel: yikes, this is the wrong order, a big change! so you know who to blame
+      // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order
+      val drmAtB = drmA.t %*% drmB
+
+      val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing,
+        bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of indicator matrices
+    indicatorMatrices
+  }
+
+  /**
+   * Compute loglikelihood ratio
+   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+   **/
+  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+                         numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+  }
+
+  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+                        crossCooccurrence: Boolean = true) = {
+    drmBtA.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+                cooccurrences.toLong, numUsers)
+
+              val candidate = thingA -> llr
+
+              // matches legacy hadoop code and maps values to range (0..1)
+              // val tLLR = 1.0 - (1.0 / (1.0 + llr))
+              //val candidate = thingA -> tLLR
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach {
+            case (otherThing, llrScore) =>
+              llrBlock(index, otherThing) = llrScore
+          }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample users and things with an anomalous amount of interactions, inspired by
+   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each user
+        for (userIndex <- 0 until keys.size) {
+
+          val interactionsOfUser = block(userIndex, ::)
+
+          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
+
+          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
+
+          interactionsOfUser.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
+              // We ignore the original interaction value and create a binary 0-1 matrix
+              // as we only consider whether interactions happened or did not happen
+              downsampledBlock(userIndex, elem.index) = 1
+            }
+          }
+        }
+
+        keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
deleted file mode 100644
index 14cc9d5..0000000
--- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-
-/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object CooccurrenceAnalysis extends Serializable {
-
-  /** Compares (Int,Double) pairs by the second value */
-  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
-  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
-                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
-
-    implicit val distributedContext = drmARaw.context
-
-    // Apply selective downsampling, pin resulting matrix
-    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
-    // num users, which equals the maximum number of interactions per item
-    val numUsers = drmA.nrow.toInt
-
-    // Compute & broadcast the number of interactions per thing in A
-    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
-    // Compute co-occurrence matrix A'A
-    val drmAtA = drmA.t %*% drmA
-
-    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
-    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
-
-    var indicatorMatrices = List(drmIndicatorsAtA)
-
-    // Now look at cross-co-occurrences
-    for (drmBRaw <- drmBs) {
-      // Down-sample and pin other interaction matrix
-      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
-      // Compute & broadcast the number of interactions per thing in B
-      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
-      // Compute cross-co-occurrence matrix B'A
-      val drmBtA = drmB.t %*% drmA
-
-      val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing,
-        bcastInteractionsPerThingB, bcastInteractionsPerItemA)
-
-      indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
-
-      drmB.uncache()
-    }
-
-    // Unpin downsampled interaction matrix
-    drmA.uncache()
-
-    // Return list of indicator matrices
-    indicatorMatrices
-  }
-
-  /**
-   * Compute loglikelihood ratio
-   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-   **/
-  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
-                         numInteractionsWithAandB: Long, numInteractions: Long) = {
-
-    val k11 = numInteractionsWithAandB
-    val k12 = numInteractionsWithA - numInteractionsWithAandB
-    val k21 = numInteractionsWithB - numInteractionsWithAandB
-    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
-    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
-  }
-
-  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
-                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
-                        crossCooccurrence: Boolean = true) = {
-    drmBtA.mapBlock() {
-      case (keys, block) =>
-
-        val llrBlock = block.like()
-        val numInteractionsB: Vector = bcastNumInteractionsB
-        val numInteractionsA: Vector = bcastNumInteractionsA
-
-        for (index <- 0 until keys.size) {
-
-          val thingB = keys(index)
-
-          // PriorityQueue to select the top-k items
-          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
-          block(index, ::).nonZeroes().foreach { elem =>
-            val thingA = elem.index
-            val cooccurrences = elem.get
-
-            // exclude co-occurrences of the item with itself
-            if (crossCooccurrence || thingB != thingA) {
-              // Compute loglikelihood ratio
-              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
-                cooccurrences.toLong, numUsers)
-
-              val candidate = thingA -> llr
-
-              // matches legacy hadoop code and maps values to range (0..1)
-              // val tLLR = 1.0 - (1.0 / (1.0 + llr))
-              //val candidate = thingA -> tLLR
-
-              // Enqueue item with score, if belonging to the top-k
-              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
-                topItemsPerThing.enqueue(candidate)
-              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
-                topItemsPerThing.dequeue()
-                topItemsPerThing.enqueue(candidate)
-              }
-            }
-          }
-
-          // Add top-k interesting items to the output matrix
-          topItemsPerThing.dequeueAll.foreach {
-            case (otherThing, llrScore) =>
-              llrBlock(index, otherThing) = llrScore
-          }
-        }
-
-        keys -> llrBlock
-    }
-  }
-
-  /**
-   * Selectively downsample users and things with an anomalous amount of interactions, inspired by
-   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
-   *
-   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
-   */
-  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
-    implicit val distributedContext = drmM.context
-
-    // Pin raw interaction matrix
-    val drmI = drmM.checkpoint()
-
-    // Broadcast vector containing the number of interactions with each thing
-    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
-    val downSampledDrmI = drmI.mapBlock() {
-      case (keys, block) =>
-        val numInteractions: Vector = bcastNumInteractions
-
-        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
-        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
-
-        val downsampledBlock = block.like()
-
-        // Downsample the interaction vector of each user
-        for (userIndex <- 0 until keys.size) {
-
-          val interactionsOfUser = block(userIndex, ::)
-
-          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
-
-          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
-
-          interactionsOfUser.nonZeroes().foreach { elem =>
-            val numInteractionsWithThing = numInteractions(elem.index)
-            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
-            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
-              // We ignore the original interaction value and create a binary 0-1 matrix
-              // as we only consider whether interactions happened or did not happen
-              downsampledBlock(userIndex, elem.index) = 1
-            }
-          }
-        }
-
-        keys -> downsampledBlock
-    }
-
-    // Unpin raw interaction matrix
-    drmI.uncache()
-
-    downSampledDrmI
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
index 0d8c160..41622a8 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -19,6 +19,8 @@ package org.apache.mahout.drivers
 
 import com.google.common.collect.BiMap
 import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
+import org.apache.mahout.sparkbindings._
 
 /**
   * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
@@ -39,14 +41,33 @@ import org.apache.mahout.math.drm.CheckpointedDrm
   *       to be not created when not needed.
   */
 
-case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+
+  // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
+  // learn this afterwards
+
+  /**
+   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+   * No physical changes are made to the underlying drm.
+   * @param n number to use for row carnindality, should be larger than current
+   * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
+   *       results.
+   */
+  def newRowCardinality(n: Int): IndexedDataset = {
+    assert(n > -1)
+    assert( n >= matrix.nrow)
+    val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
+    val ncol = matrix.ncol
+    val newMatrix = drmWrap[Int](drmRdd, n, ncol)
+    new IndexedDataset(newMatrix, rowIDs, columnIDs)
+  }
 }
 
 /**
   * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
   * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
   * {{{
-  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source))
+  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source))
   * }}}
   */
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 71d36c9..e0eaabc 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,7 +17,8 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import scala.collection.immutable.HashMap
 
 /**
  * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
@@ -25,7 +26,7 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
  * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
- * will be used to calculate row-wise self-similarity, or when using filters, will generate two
+ * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
  * to the secondary. Returns one or two directories of text files formatted as specified in
@@ -35,14 +36,21 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
  * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
  * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
  * you can specify only the input and output file and directory--all else will default to the correct values.
- * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check
+ * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option.
  */
 object ItemSimilarityDriver extends MahoutDriver {
-  //todo: Should also take two input streams and do cross similarity with no filter required.
-  // required for examples
+  // define only the options specific to ItemSimilarity
+  private final val ItemSimilarityOptions = HashMap[String, Any](
+    "maxPrefs" -> 500,
+    "maxSimilaritiesPerItem" -> 100,
+    "appName" -> "ItemSimilarityDriver")
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so the can override and previous options!
+  private var options: Map[String, Any] = null
 
-  private var options: Options = _
   private var reader1: TextDelimitedIndexedDatasetReader = _
   private var reader2: TextDelimitedIndexedDatasetReader = _
   private var writer: TextDelimitedIndexedDatasetWriter = _
@@ -52,190 +60,103 @@ object ItemSimilarityDriver extends MahoutDriver {
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
-    val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") {
+    options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++
+      MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++
+      MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions
+
+    val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
       head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
 
       //Input output options, non-driver specific
-      note("Input, output options")
-      opt[String]('i', "input") required() action { (x, options) =>
-        options.copy(input = x)
-      } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
-      opt[String]('o', "output") required() action { (x, options) =>
-        if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally?
-          options.copy(output = x)
-        else
-          options.copy(output = x + "/")
-      } text ("Path for output, any local or HDFS supported URI (required).")
+      parseIOOptions
 
       //Algorithm control options--driver specific
       note("\nAlgorithm control options:")
-      opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-        options.copy(master = x)
-      }
-
       opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
-        options.copy(maxPrefs = x)
-      } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x =>
+        options + ("maxPrefs" -> x)
+      } text ("Max number of preferences to consider per user (optional). Default: " +
+        ItemSimilarityOptions("maxPrefs")) validate { x =>
         if (x > 0) success else failure("Option --maxPrefs must be > 0")
       }
 
 /** not implemented in CooccurrenceAnalysis.cooccurrence
       opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
-        options.copy(minPrefs = x)
+        options.put("minPrefs", x)
+        options
       } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
         if (x > 0) success else failure("Option --minPrefs must be > 0")
       }
 */
 
       opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
-        options.copy(maxSimilaritiesPerItem = x)
-      } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x =>
+        options + ("maxSimilaritiesPerItem" -> x)
+      } text ("Limit the number of similarities per item to this number (optional). Default: " +
+        ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
         if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
       }
 
-      opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
-        options.copy(randomSeed = x)
-      } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x =>
-        if (x > 0) success else failure("Option --randomSeed must be > 0")
-      }
-
-      //Input text file schema--not driver specific but input data specific, tuples input,
-      // not drms
-      note("\nInput text file schema options:")
-      opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
-        options.copy(inDelim = x)
-      }
-
-      opt[String]("filter1") abbr ("f1") action { (x, options) =>
-        options.copy(filter1 = x)
-      } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
-      opt[String]("filter2") abbr ("f2") action { (x, options) =>
-        options.copy(filter2 = x)
-      } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.")
-
-      opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
-        options.copy(rowIDPosition = x)
-      } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
-      }
-
-      opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
-        options.copy(itemIDPosition = x)
-      } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
-      }
-
-      opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
-        options.copy(filterPosition = x)
-      } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
-        if (x >= -1) success else failure("Option --filterColNum must be >= -1")
-      }
-
-      note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+      //Driver notes--driver specific
+      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
-      //File finding strategy--not driver specific
-      note("\nFile discovery options:")
-      opt[Unit]('r', "recursive") action { (_, options) =>
-        options.copy(recursive = true)
-      } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+      //Input text format
+      parseInputSchemaOptions
 
-      opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
-        options.copy(filenamePattern = x)
-      } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+      //How to search for input
+      parseFileDiscoveryOptions
 
       //Drm output schema--not driver specific, drm specific
-      note("\nOutput text file schema options:")
-      opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
-        options.copy(rowKeyDelim = x)
-      } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
-      opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
-        options.copy(columnIdStrengthDelim = x)
-      } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
-      opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
-        options.copy(tupleDelim = x)
-      } text ("Separates vector tuple values in the values list (optional). Default: \",\"")
-
-      opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
-        options.copy(omitStrength = true)
-      } text ("Do not write the strength to the output files (optional), Default: false.")
-      note("This option is used to output indexable data for creating a search engine recommender.")
+      parseDrmFormatOptions
 
       //Spark config options--not driver specific
-      note("\nSpark config options:")
-      opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
-        options.copy(sparkExecutorMem = x)
-      } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g")
+      parseSparkOptions
 
-      note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"")
-
-      //Jar inclusion, this option can be set when executing the driver from compiled code
-      opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
-        options.copy(dontAddMahoutJars = true) //set the value MahoutDriver so the context will be created correctly
-      }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
-
-      //Driver notes--driver specific
-      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n")
+      //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
+      parseGenericOptions
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
-      checkConfig { c =>
-        if (c.filterPosition == c.itemIDPosition
-            || c.filterPosition == c.rowIDPosition
-            || c.rowIDPosition == c.itemIDPosition)
-          failure("The row, item, and filter positions must be unique.") else success
-      }
-
-      //check for option consistency, probably driver specific
-      checkConfig { c =>
-        if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" +
-          " using filters they must be unique.") else success
-      }
-
     }
-
-    //repeated code, should this be put base MahoutDriver somehow?
-    parser.parse(args, Options()) map { opts =>
+    parser.parse(args, options) map { opts =>
       options = opts
       process
     }
-
   }
 
-  override def start(masterUrl: String = options.master,
-      appName: String = options.appName, dontAddMahoutJars: Boolean = options.dontAddMahoutJars):
+  override def start(masterUrl: String = options("master").asInstanceOf[String],
+      appName: String = options("appName").asInstanceOf[String],
+      dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]):
     Unit = {
 
+    // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
+    // MahoutKryoRegistrator, it should be added to the register list here so it
+    // will be only spcific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
       .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", options.sparkExecutorMem)
+      .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
 
     super.start(masterUrl, appName, dontAddMahoutJars)
 
-    val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1,
-        "rowIDPosition" -> options.rowIDPosition,
-        "columnIDPosition" -> options.itemIDPosition,
-        "filterPosition" -> options.filterPosition)
+    val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
+        "filter" -> options("filter1").asInstanceOf[String],
+        "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int],
+        "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int],
+        "filterPosition" -> options("filterPosition").asInstanceOf[Int])
 
     reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
 
-    if (options.filterPosition != -1 && options.filter2 != null) {
-      val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2,
-          "rowIDPosition" -> options.rowIDPosition,
-          "columnIDPosition" -> options.itemIDPosition,
-          "filterPosition" -> options.filterPosition)
+    if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null)
+        || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){
+      // only need to change the filter used compared to readSchema1
+      val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String])
 
       reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
     }
 
     writeSchema = new Schema(
-        "rowKeyDelim" -> options.rowKeyDelim,
-        "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
-        "omitScore" -> options.omitStrength,
-        "tupleDelim" -> options.tupleDelim)
+        "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String],
+        "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String],
+        "omitScore" -> options("omitStrength").asInstanceOf[Boolean],
+        "tupleDelim" -> options("tupleDelim").asInstanceOf[String])
 
     writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
 
@@ -243,24 +164,60 @@ object ItemSimilarityDriver extends MahoutDriver {
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris
+    val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+        options("recursive").asInstanceOf[Boolean]).uris
+    val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) ""
+      else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+          options("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       Array()
     } else {
 
-      val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles))
+      val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles))
+      if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA,
+          options("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+
+      // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+      // Here we assume there is one row ID space for all interactions. To do this we calculate the
+      // row cardinality only after reading in A and B (or potentially C...) We then adjust the
+      // cardinality so all match, which is required for the math to work.
+      // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
+      // be supported (and are at least on Spark) or the row cardinality fix will not work.
+      val datasetB = if (!inFiles2.isEmpty) {
+        // get cross-cooccurrence interactions from separate files
+        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+
+        datasetB
+
+      } else if (options("filterPosition").asInstanceOf[Int] != -1
+          && options("filter2").asInstanceOf[String] != null) {
+
+        // get cross-cooccurrences interactions by using two filters on a single set of files
+        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+
+        datasetB
 
-      if (options.filterPosition != -1 && options.filter2 != null) {
-        // todo: needs to support more than one cross-similarity indicator
-        val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles))
-        Array(selfSimilarityDataset, crossSimilarityDataset1)
       } else {
-        Array(selfSimilarityDataset)
+        null.asInstanceOf[IndexedDataset]
       }
+      if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
+        // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
+        val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
 
-    }
+        // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on
+        // its calculation
+        val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality)
+          else datasetA // this guarantees matching cardinality
 
+        val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
+          else datasetB // this guarantees matching cardinality
+
+        if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions")
+
+        Array(returnedA, returnedB)
+      } else Array(datasetA)
+    }
   }
 
   override def process: Unit = {
@@ -271,57 +228,29 @@ object ItemSimilarityDriver extends MahoutDriver {
     // todo: allow more than one cross-similarity matrix?
     val indicatorMatrices = {
       if (indexedDatasets.length > 1) {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix))
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+            options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int],
+            Array(indexedDatasets(1).matrix))
       } else {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs)
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+          options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int])
       }
     }
 
-    // self similarity
-    // the next two lines write the drm using a Writer class
-    // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs)
-    // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix")
-
     // an alternative is to create a version of IndexedDataset that knows how to write itself
     val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
       indexedDatasets(0).columnIDs, writeSchema)
-    selfIndicatorDataset.writeTo(options.output + "indicator-matrix")
+    selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix")
 
-    // todo: needs to support more than one cross-similarity indicator
+    // todo: would be nice to support more than one cross-similarity indicator
     if (indexedDatasets.length > 1) {
 
       val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
-      writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix")
+      writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix")
 
     }
 
     stop
   }
 
-  // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option
-  // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs
-  case class Options(
-      master: String = "local",
-      sparkExecutorMem: String = "2g",
-      appName: String = "ItemSimilarityJob",
-      randomSeed: Int = System.currentTimeMillis().toInt,
-      recursive: Boolean = false,
-      input: String = null,
-      output: String = null,
-      filenamePattern: String = "^part-.*",
-      maxSimilaritiesPerItem: Int = 100,
-      maxPrefs: Int = 500,
-      minPrefs: Int = 1,
-      rowIDPosition: Int = 0,
-      itemIDPosition: Int = 1,
-      filterPosition: Int = -1,
-      filter1: String = null,
-      filter2: String = null,
-      inDelim: String = "[,\t ]",
-      rowKeyDelim: String = "\t",
-      columnIdStrengthDelim: String = ":",
-      tupleDelim: String = ",",
-      omitStrength: Boolean = false,
-      dontAddMahoutJars: Boolean = false)
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 0c579d4..796a66a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -21,17 +21,26 @@ import org.apache.mahout.math.drm.DistributedContext
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 
+import scala.collection.immutable
+
 /** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a command line parser and default options or fill in the following template:
+  * Also define a Map of options for the command line parser. The following template may help:
   * {{{
   *   object SomeDriver extends MahoutDriver {
+  *     // build options from some stardard CLI param groups
+  *     // Note: always put the driver specific options at the last so the can override and previous options!
+  *     private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++
+  *       TextDelimitedDRMOptions ++ ItemSimilarityOptions
+  *
   *     override def main(args: Array[String]): Unit = {
-  *       val parser = new MahoutOptionParser[Options]("Job Name") {
-  *         head("Job Name", "Spark")
-  *         note("Various CLI options")
-  *         //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends
+  *       val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+  *         head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+  *
+  *         //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods
+  *         parseGenericOptions
+  *         ...
   *       }
-  *       parser.parse(args, Options()) map { opts =>
+  *       parser.parse(args, options) map { opts =>
   *         options = opts
   *         process
   *       }
@@ -42,15 +51,12 @@ import org.apache.mahout.sparkbindings._
   *       //don't just stand there do something
   *       stop
   *     }
-  *
-  *     //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option
-  *     case class Options(
-  *       appName: String = "Job Name", ...
-  *     )
   *   }
   * }}}
   */
 abstract class MahoutDriver {
+
+
   implicit var mc: DistributedContext = _
   implicit val sparkConf = new SparkConf()
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 8a337f5..ba4ca1d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -17,8 +17,189 @@
 package org.apache.mahout.drivers
 
 import scopt.OptionParser
+import scala.collection.immutable
 
-/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */
-class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) {
+/** Companion object defines default option groups for reference in any driver that needs them */
+object MahoutOptionParser {
+  // set up the various default option groups
+  final val GenericOptions = immutable.HashMap[String, Any](
+    "randomSeed" -> System.currentTimeMillis().toInt,
+    "dontAddMahoutJars" -> false,
+    "writeAllDatasets" -> false)
+
+  final val SparkOptions = immutable.HashMap[String, Any](
+    "master" -> "local",
+    "sparkExecutorMem" -> "2g",
+    "appName" -> "Generic Spark App, Change this.")
+
+  final val FileIOOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
+    "input" -> null.asInstanceOf[String],
+    "input2" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String],
+    "filenamePattern" -> "^part-.*")
+
+  final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any](
+    "rowIDPosition" -> 0,
+    "itemIDPosition" -> 1,
+    "filterPosition" -> -1,
+    "filter1" -> null.asInstanceOf[String],
+    "filter2" -> null.asInstanceOf[String],
+    "inDelim" -> "[,\t ]")
+
+  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "tupleDelim" -> " ",
+    "omitStrength" -> false)
+}
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+  * keep both standarized.
+  * @param programName Name displayed in help message, the name by which the driver is invoked.
+  * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
   override def showUsageOnError = true
+
+  def parseIOOptions = {
+    note("Input, output options")
+    opt[String]('i', "input") required() action { (x, options) =>
+      options + ("input" -> x)
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+    opt[String]("input2") abbr ("i2")  action { (x, options) =>
+      options + ("input2" -> x)
+    } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+
+    opt[String]('o', "output") required() action { (x, options) =>
+      // todo: check to see if HDFS allows MS-Windows backslashes locally?
+      if (x.endsWith("/")) {
+        options + ("output" -> x)
+      } else {
+        options + ("output" -> (x + "/"))
+      }
+    } text ("Path for output, any local or HDFS supported URI (required)")
+
+  }
+
+  def parseSparkOptions = {
+    note("\nSpark config options:")
+
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+      options + ("master" -> x)
+    }
+
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
+      options + ("sparkExecutorMem" -> x)
+    }
+
+  }
+
+  def parseGenericOptions = {
+    note("\nGeneral config options:")
+    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+      options + ("randomSeed" -> x)
+    } validate { x =>
+      if (x > 0) success else failure("Option --randomSeed must be > 0")
+    }
+
+    opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
+      options + ("dontAddMahoutJars" -> true)
+    }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
+
+    //output both input DRMs
+    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+      options + ("writeAllDatasets" -> true)
+    }//Hidden option, though a user might want this.
+  }
+
+  def parseInputSchemaOptions{
+    //Input text file schema--not driver specific but input data specific, tuples input,
+    // not drms
+    note("\nInput text file schema options:")
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+      options + ("inDelim" -> x)
+    }
+
+    opt[String]("filter1") abbr ("f1") action { (x, options) =>
+      options + ("filter1" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+    opt[String]("filter2") abbr ("f2") action { (x, options) =>
+      options + ("filter2" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+    opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+      options + ("rowIDPosition" -> x)
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    }
+
+    opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+      options + ("itemIDPosition" -> x)
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    }
+
+    opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+      options + ("filterPosition" -> x)
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+    }
+
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+    checkConfig { options: Map[String, Any] =>
+      if (options("filterPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int]
+        || options("filterPosition").asInstanceOf[Int] == options("rowIDPosition").asInstanceOf[Int]
+        || options("rowIDPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int])
+        failure("The row, item, and filter positions must be unique.") else success
+    }
+
+    //check for option consistency, probably driver specific
+    checkConfig { options: Map[String, Any] =>
+      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+        failure ("If using filters they must be unique.") else success
+    }
+
+  }
+
+  def parseFileDiscoveryOptions = {
+    //File finding strategy--not driver specific
+    note("\nFile discovery options:")
+    opt[Unit]('r', "recursive") action { (_, options) =>
+      options + ("recursive" -> true)
+    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+      options + ("filenamePattern" -> x)
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+  }
+
+  def parseDrmFormatOptions = {
+    note("\nOutput text file schema options:")
+    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+      options + ("rowKeyDelim" -> x)
+    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+      options + ("columnIdStrengthDelim" -> x)
+    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+    opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
+      options + ("tupleDelim" -> x)
+    } text ("Separates vector tuple values in the values list (optional). Default: \" \"")
+
+    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+      options + ("omitStrength" -> true)
+    } text ("Do not write the strength to the output files (optional), Default: false.")
+    note("This option is used to output indexable data for creating a search engine recommender.")
+
+    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+  }
+
 }
+
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
index c5b7385..e2bb49c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -17,25 +17,39 @@
 
 package org.apache.mahout.drivers
 
+import com.google.common.collect.{HashBiMap, BiMap}
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read.
-  * @tparam T type of object read, usually supplied by an extending trait.
-  * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created.
+/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read.
+  * @tparam T type of object read.
   */
 trait Reader[T]{
+
   val mc: DistributedContext
   val readSchema: Schema
-  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T
-  def readFrom(source: String): T = reader(mc, readSchema, source)
+
+  protected def tupleReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  def readTuplesFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    tupleReader(mc, readSchema, source, existingRowIDs)
 }
 
 /** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
-  * @tparam T
+  * @tparam T type of object to write.
   */
 trait Writer[T]{
+
   val mc: DistributedContext
+  val sort: Boolean
   val writeSchema: Schema
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit
-  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection)
+
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+  def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index 7735b83..edff92d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -20,21 +20,30 @@ package org.apache.mahout.drivers
 import scala.collection.mutable
 import scala.collection.mutable.HashMap
 
-/** Syntactic sugar for HashMap[String, Any]
+/** Syntactic sugar for mutable.HashMap[String, Any]
   *
   * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
   */
 class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
   // note: this require a mutable HashMap, do we care?
   this ++= params
-  if (!this.contains("omitScore")) this += ("omitScore" -> false)
+
+  /** Constructor for copying an existing Schema
+    *
+    * @param schemaToClone return a copy of this Schema
+    */
+  def this(schemaToClone: Schema){
+    this()
+    this ++= schemaToClone
+  }
 }
 
-// These can be used to keep the text in and out fairly standard to Mahout, where an application specific format is not
-// required.
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required.
 
 /** Simple default Schema for typical text delimited tuple file input
-  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID
+  * <comma, tab, or space>here may be other ignored text...)
   */
 class DefaultTupleReadSchema extends Schema(
     "delim" -> "[,\t ]", //comma, tab or space
@@ -43,44 +52,47 @@ class DefaultTupleReadSchema extends Schema(
     "columnIDPosition" -> 1,
     "filterPosition" -> -1)
 
-/** Simple default Schema for typical text delimited drm file output
-  * This tells the writer to write a DRM of the default
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
+/** Default Schema for text delimited drm file output
+  * This tells the writer to write a DRM of the default form:
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
   */
 class DefaultDRMWriteSchema extends Schema(
     "rowKeyDelim" -> "\t",
     "columnIdStrengthDelim" -> ":",
-    "tupleDelim" -> ",")
+    "tupleDelim" -> " ",
+    "omitScore" -> false)
 
-/** Simple default Schema for typical text delimited drm file output
-  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+/** Default Schema for typical text delimited drm file input
+  * This tells the reader to input text lines of the form:
+  * (rowID<tab>columnID1:score1,columnID2:score2,...)
   */
 class DefaultDRMReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",")
+  "tupleDelim" -> " ")
 
-/** Simple default Schema for reading a text delimited drm file  where the score of any tuple is ignored,
+/** Default Schema for reading a text delimited drm file  where the score of any tuple is ignored,
   * all non-zeros are replaced with 1.
   * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is ignored. Alternatively the format can be
-  * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. This is the default output format for
-  * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+  * Alternatively the format can be
+  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+  * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
   */
 class DRMReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",",
+  "tupleDelim" -> " ",
   "omitScore" -> true)
 
-/** Simple default Schema for typical text delimited drm file write where the score of a tuple is omitted.
+/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted.
   * The presence of a tuple means the score = 1, the absence means a score of 0.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1,columnID2,...)
+  * This tells the writer to output DRM lines of the form
+  * (rowID<tab>columnID1<space>columnID2...)
   */
 class DRMWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",",
+  "tupleDelim" -> " ",
   "omitScore" -> true)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index ae78d59..11d647b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,14 +17,12 @@
 
 package org.apache.mahout.drivers
 
-import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
 import com.google.common.collect.{BiMap, HashBiMap}
-import scala.collection.JavaConversions._
 import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
-
+import scala.collection.JavaConversions._
 
 /** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
   */
@@ -36,7 +34,11 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
     * @return
     */
-  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = {
+  protected def tupleReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
     try {
       val delimiter = readSchema("delim").asInstanceOf[String]
       val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
@@ -51,6 +53,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       })
 
       var columns = mc.textFile(source).map { line => line.split(delimiter) }
+      //val m = columns.collect
 
       // -1 means no filter in the input text, take them all
       if(filterPosition != -1) {
@@ -59,7 +62,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       }
 
       // get row and column IDs
-      val m = columns.collect
+      //val m = columns.collect
       val interactions = columns.map { tokens =>
         tokens(rowIDPosition) -> tokens(columnIDPosition)
       }
@@ -75,10 +78,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
 
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
       // broadcast them for access in distributed processes, so they are not recalculated in every task.
-      val rowIDDictionary = asOrderedDictionary(rowIDs)
+      val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
       val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
 
-      val columnIDDictionary = asOrderedDictionary(columnIDs)
+      val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
       val indexedInteractions =
@@ -113,11 +116,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
   // this creates a BiMap from an ID collection. The ID points to an ordinal int
   // which is used internal to Mahout as the row or column ID
   // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
-  private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = {
-    var dictionary: BiMap[String, Int] = HashBiMap.create()
-    var index = 0
+  private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+    var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
     for (entry <- entries) {
-      dictionary.forcePut(entry, index)
+      if (!dictionary.contains(entry)) dictionary.put(entry, index)
       index += 1
     }
     dictionary
@@ -125,13 +127,21 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
 }
 
 trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
   /** Read in text delimited tuples from all URIs in this comma delimited source String.
     *
     * @param mc context for the Spark job
     * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
     * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
     */
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = {
+  protected def writer(
+      mc: DistributedContext,
+      writeSchema: Schema,
+      dest: String,
+      indexedDataset: IndexedDataset,
+      sort: Boolean = true): Unit = {
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -140,8 +150,14 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
       //instance vars must be put into locally scoped vals when put into closures that are
       //executed but Spark
 
-      assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException })
-      assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException})
+      assert(indexedDataset != null, {
+        println(this.getClass.toString + ": has no indexedDataset to write")
+        throw new IllegalArgumentException
+      })
+      assert(!dest.isEmpty, {
+        println(this.getClass.toString + ": has no destination or indextedDataset to write")
+        throw new IllegalArgumentException
+      })
 
       val matrix = indexedDataset.matrix
       val rowIDDictionary = indexedDataset.rowIDs
@@ -149,18 +165,29 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
 
       matrix.rdd.map { case (rowID, itemVector) =>
 
-        // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens
-        // first get the external rowID token
-        var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
-
-        // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
-        for (item <- itemVector.nonZeroes()) {
-          line += columnIDDictionary.inverse.get(item.index)
-          if (!omitScore) line += columnIdStrengthDelim + item.get
-          line += tupleDelim
+        // turn non-zeros into list for sorting
+        val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]]
+        for (ve <- itemVector.nonZeroes) {
+          val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
+          itemList += item
         }
-        // drop the last delimiter, not needed to end the line
-        line.dropRight(1)
+        //sort by highest value descending(-)
+        val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+
+        // first get the external rowID token
+        if (!vector.isEmpty){
+          var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+          // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+          for (item <- vector) {
+            line += columnIDDictionary.inverse.get(item.getFirst)
+            if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+            line += tupleDelim
+          }
+          // drop the last delimiter, not needed to end the line
+          line.dropRight(1)
+        } else {//no items so write a line with id but no values, no delimiters
+          rowIDDictionary.inverse.get(rowID)
+        } // "if" returns a line of text so this must be last in the block
       }
       .saveAsTextFile(dest)
 
@@ -176,25 +203,28 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
 /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
   * @param mc Spark context for reading files
-  * @note The source is supplied by Reader#readFrom .
+  * @note The source is supplied by Reader#readTuplesFrom .
   * */
-class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
+    (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
 
 /** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading files
-  * @note the destination is supplied by Writer#writeTo trait method
+  * @note the destination is supplied by Writer#writeDRMTo trait method
   * */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
 
 /** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading the files, may be implicitly defined.
   * */
-class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true)
+    (implicit val mc: DistributedContext)
+  extends TDIndexedDatasetReaderWriter
 
-/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
+/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating
   * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
   * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
   * are probably short lived in terms of lines of code so complexity may be moot.
@@ -204,12 +234,17 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
   * @param writeSchema contains params for the schema/format or the written text delimited file.
   * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
   * */
-class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int],
-                                           val writeSchema: Schema)(implicit val mc: DistributedContext)
+class IndexedDatasetTextDelimitedWriteable(
+    matrix: CheckpointedDrm[Int],
+    rowIDs: BiMap[String,Int],
+    columnIDs: BiMap[String,Int],
+    val writeSchema: Schema,
+    val sort: Boolean = true)
+    (implicit val mc: DistributedContext)
   extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
 
   def writeTo(dest: String): Unit = {
-    writeTo(this, dest)
+    writeDRMTo(this, dest)
   }
 }
 
@@ -217,11 +252,11 @@ class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs:
  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
  * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
  * {{{
- *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source))
+ *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source))
  * }}}
  */
 
 object IndexedDatasetTextDelimitedWriteable {
   /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc)
+  def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 1c5546b..cc5ebf2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -65,7 +65,6 @@ class CheckpointedDrmSpark[K: ClassTag](
   private var cached: Boolean = false
   override val context: DistributedContext = rdd.context
 
-
   /**
    * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 22e31cc..61f37e4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -23,6 +23,10 @@ import com.google.common.collect.HashBiMap
 import org.apache.mahout.math._
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.Pair
+import org.apache.mahout.math.Vector.Element
+
+import scala.collection.immutable.List
 
 /** Kryo serialization registrator for Mahout */
 class MahoutKryoRegistrator extends KryoRegistrator {
@@ -32,6 +36,6 @@ class MahoutKryoRegistrator extends KryoRegistrator {
     kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable])
-    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer());
+    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer())
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 938dc33..642e90a 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.cf
 
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings.{MatrixOps, _}
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
@@ -48,13 +49,19 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     (0.0,                0.0,                0.0,                     0.0,                0.0))
 
   // correct cross-cooccurrence with LLR
-  final val matrixLLRCoocBtAControl = dense(
+  final val m = dense(
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (0.0,                0.0,                0.0,                0.0,                4.498681156950466))
 
+  final val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
 
 
   test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
@@ -150,6 +157,46 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     n should be < 1E-10
   }
 
+  test("cooccurrence two matrices with different number of columns"){
+    val a = dense(
+      (1, 1, 0, 0, 0),
+      (0, 0, 1, 1, 0),
+      (0, 0, 0, 0, 1),
+      (1, 0, 0, 1, 0))
+
+    val b = dense(
+      (0, 1, 1, 0),
+      (1, 1, 1, 0),
+      (0, 0, 1, 0),
+      (1, 1, 0, 1))
+
+    val matrixLLRCoocBtANonSymmetric = dense(
+      (0.0,                1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.0,                0.6795961471815897, 0.0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+
+    //cooccurrence without LLR is just a A'B
+    //val inCoreAtB = a.transpose().times(b)
+    //val bp = 0
+  }
+
   test("LLR calc") {
     val A = dense(
         (1, 1, 0, 0, 0),