You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/08/28 20:00:14 UTC

[1/2] MAHOUT-1604 add a CLI and associated code for spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569, closes apache/mahout#47

Repository: mahout
Updated Branches:
  refs/heads/master 91f15ecfe -> 149c98592


http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 642e90a..29c7b84 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.cf
 
-import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.SimilarityAnalysis
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings.{MatrixOps, _}
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
@@ -81,7 +81,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
     //self similarity
-    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
     val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
     var n = (new MatrixOps(m = diffMatrix)).norm
@@ -112,7 +112,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
     //self similarity
-    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
     val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
     var n = (new MatrixOps(m = diffMatrix)).norm
@@ -142,7 +142,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
    //self similarity
-    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
     //var cp = drmSelfCooc(0).checkpoint()
     //cp.writeDRM("/tmp/cooc-spark/")//to get values written
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
@@ -181,7 +181,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
     //self similarity
-    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
     val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
     var n = (new MatrixOps(m = diffMatrix)).norm
@@ -227,21 +227,6 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
          (1, 0, 1, 0, 0),
          (0, 0, 0, 0, 0))
 
-        for (MatrixSlice row : cooccurrence) {
-            for (Vector.Element element : row.vector().nonZeroes()) {
-                long k11 = (long) element.get();// = 1
-                long k12 = (long) (rowSums.get(row.index()) - k11);// = 0
-                long k21 = (long) (colSums.get(element.index()) - k11);// = 1
-                long k22 = (long) (total - k11 - k12 - k21);// = 2
-                double score = LogLikelihood.rootLogLikelihoodRatio(k11, k12, k21, k22);
-                element.set(score);
-            }
-        }
-
-        for some reason the hadoop version returns the following
-        return 1.0 - 1.0 / (1.0 + logLikelihood);
-        so not a pure llr or root llr
-
     */
 
     //item (1,0)
@@ -250,7 +235,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     val numInteractionsWithB = 2L
     val numInteractions = 6l
 
-    val llr = CooccurrenceAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+    val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
 
     assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
   }
@@ -263,7 +248,7 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
         (1, 1, 0, 1, 0))
     val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
 
-    val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
+    val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
     //count non-zero values, should be == 7
     var numValues = 0
     val m = downSampledDrm.collect

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 4bf1662..0a73469 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -26,6 +26,7 @@ import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings._
 
 //todo: take out, only for temp tests
+
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
@@ -33,53 +34,53 @@ import RLikeDrmOps._
 import scala.collection.JavaConversions._
 
 
-class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
+class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
-/*
-  final val matrixLLRCoocAtAControl = dense(
-    (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
-    (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
-    (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.0,                0.0))
-
-  final val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
-      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
-      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
-*/
+  /*
+    final val matrixLLRCoocAtAControl = dense(
+      (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
+      (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
+      (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
+      (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
+      (0.0,                0.0,                0.0,                     0.0,                0.0))
+
+    final val matrixLLRCoocBtAControl = dense(
+        (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+        (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+        (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+        (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+        (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
+  */
 
 
   final val SelfSimilairtyLines = Iterable(
-      "galaxy\tnexus:1.7260924347106847",
-      "ipad\tiphone:1.7260924347106847",
-      "nexus\tgalaxy:1.7260924347106847",
-      "iphone\tipad:1.7260924347106847",
-      "surface")
+    "galaxy\tnexus:1.7260924347106847",
+    "ipad\tiphone:1.7260924347106847",
+    "nexus\tgalaxy:1.7260924347106847",
+    "iphone\tipad:1.7260924347106847",
+    "surface")
 
   val CrossIndicatorLines = Iterable(
-      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")
+    "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+    "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+    "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+    "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+    "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")
 
   // todo: a better test would be to sort each vector by itemID and compare rows, tokens misses some error cases
   final val SelfSimilairtyTokens = tokenize(Iterable(
-      "galaxy\tnexus:1.7260924347106847",
-      "ipad\tiphone:1.7260924347106847",
-      "nexus\tgalaxy:1.7260924347106847",
-      "iphone\tipad:1.7260924347106847",
-      "surface"))
+    "galaxy\tnexus:1.7260924347106847",
+    "ipad\tiphone:1.7260924347106847",
+    "nexus\tgalaxy:1.7260924347106847",
+    "iphone\tipad:1.7260924347106847",
+    "surface"))
 
   val CrossIndicatorTokens = tokenize(Iterable(
-      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
+    "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+    "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+    "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+    "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+    "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
 
   // now in MahoutSuite
   // final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
@@ -113,32 +114,32 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     ))
   */
 
-  test ("ItemSimilarityDriver, non-full-spec CSV"){
+  test("ItemSimilarityDriver, non-full-spec CSV") {
 
     val InFile = TmpDir + "in-file.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1,purchase,iphone",
-        "u1,purchase,ipad",
-        "u2,purchase,nexus",
-        "u2,purchase,galaxy",
-        "u3,purchase,surface",
-        "u4,purchase,iphone",
-        "u4,purchase,galaxy",
-        "u1,view,iphone",
-        "u1,view,ipad",
-        "u1,view,nexus",
-        "u1,view,galaxy",
-        "u2,view,iphone",
-        "u2,view,ipad",
-        "u2,view,nexus",
-        "u2,view,galaxy",
-        "u3,view,surface",
-        "u3,view,nexus",
-        "u4,view,iphone",
-        "u4,view,ipad",
-        "u4,view,galaxy")
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -146,22 +147,22 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", ",",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1",
-        "--writeAllDatasets"))
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--writeAllDatasets"))
 
     // todo: these comparisons rely on a sort producing the same lines, which could possibly
     // fail since the sort is on value and these can be the same for all items in a vector
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
   }
 
@@ -173,26 +174,26 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1\tpurchase\tiphone",
-        "u1\tpurchase\tipad",
-        "u2\tpurchase\tnexus",
-        "u2\tpurchase\tgalaxy",
-        "u3\tpurchase\tsurface",
-        "u4\tpurchase\tiphone",
-        "u4\tpurchase\tgalaxy",
-        "u1\tview\tiphone",
-        "u1\tview\tipad",
-        "u1\tview\tnexus",
-        "u1\tview\tgalaxy",
-        "u2\tview\tiphone",
-        "u2\tview\tipad",
-        "u2\tview\tnexus",
-        "u2\tview\tgalaxy",
-        "u3\tview\tsurface",
-        "u3\tview\tnexus",
-        "u4\tview\tiphone",
-        "u4\tview\tipad",
-        "u4\tview\tgalaxy")
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone",
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -200,21 +201,21 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", "[,\t]",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1"))
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "[,\t]",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"))
 
     // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
     // some error cases
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
@@ -225,26 +226,26 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
-        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
-        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
-        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
-        "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
-        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
-        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
-        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
-        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
-        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
-        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
-        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
-        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
-        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
-        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
-        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
-        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
-        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
-        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
-        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
+      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
+      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
+      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -252,20 +253,20 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", "\t",
-        "--itemIDPosition", "4",
-        "--rowIDPosition", "1",
-        "--filterPosition", "2"))
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "\t",
+      "--itemIDPosition", "4",
+      "--rowIDPosition", "1",
+      "--filterPosition", "2"))
 
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
@@ -279,23 +280,23 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-        "0,0,1",
-        "0,1,1",
-        "1,2,1",
-        "1,3,1",
-        "2,4,1",
-        "3,0,1",
-        "3,3,1")
+      "0,0,1",
+      "0,1,1",
+      "1,2,1",
+      "1,3,1",
+      "2,4,1",
+      "3,0,1",
+      "3,3,1")
 
     val Answer = tokenize(Iterable(
-        "0\t1:1.7260924347106847",
-        "3\t2:1.7260924347106847",
-        "1\t0:1.7260924347106847",
-        "4",
-        "2\t3:1.7260924347106847"))
+      "0\t1:1.7260924347106847",
+      "3\t2:1.7260924347106847",
+      "1\t0:1.7260924347106847",
+      "4",
+      "2\t3:1.7260924347106847"))
 
     // this creates one part-0000 file in the directory
-    mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
+    mahoutCtx.parallelize(lines).coalesce(1, shuffle = true).saveAsTextFile(InDir)
 
     // to change from using part files to a single .tsv file we'll need to use HDFS
     val fs = FileSystem.get(new Configuration())
@@ -304,11 +305,11 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InPath,
-        "--output", OutPath,
-        "--master", masterUrl))
+      "--input", InPath,
+      "--output", OutPath,
+      "--master", masterUrl))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
@@ -322,23 +323,23 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-        "0,0,1",
-        "0,1,1",
-        "1,2,1",
-        "1,3,1",
-        "2,4,1",
-        "3,0,1",
-        "3,3,1")
+      "0,0,1",
+      "0,1,1",
+      "1,2,1",
+      "1,3,1",
+      "2,4,1",
+      "3,0,1",
+      "3,3,1")
 
     val Answer = tokenize(Iterable(
-        "0\t1",
-        "3\t2",
-        "1\t0",
-        "4",
-        "2\t3"))
+      "0\t1",
+      "3\t2",
+      "1\t0",
+      "4",
+      "2\t3"))
 
     // this creates one part-0000 file in the directory
-    mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
+    mahoutCtx.parallelize(lines).coalesce(1, shuffle = true).saveAsTextFile(InDir)
 
     // to change from using part files to a single .tsv file we'll need to use HDFS
     val fs = FileSystem.get(new Configuration())
@@ -347,12 +348,12 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InPath,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--omitStrength"))
+      "--input", InPath,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--omitStrength"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
@@ -362,28 +363,28 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     // tmp/data/m1.tsv
     // tmp/data/more-data/another-dir/m2.tsv
     val M1Lines = Array(
-        "u1\tpurchase\tiphone",
-        "u1\tpurchase\tipad",
-        "u2\tpurchase\tnexus",
-        "u2\tpurchase\tgalaxy",
-        "u3\tpurchase\tsurface",
-        "u4\tpurchase\tiphone",
-        "u4\tpurchase\tgalaxy",
-        "u1\tview\tiphone")
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone")
 
     val M2Lines = Array(
-        "u1\tview\tipad",
-        "u1\tview\tnexus",
-        "u1\tview\tgalaxy",
-        "u2\tview\tiphone",
-        "u2\tview\tipad",
-        "u2\tview\tnexus",
-        "u2\tview\tgalaxy",
-        "u3\tview\tsurface",
-        "u3\tview\tnexus",
-        "u4\tview\tiphone",
-        "u4\tview\tipad",
-        "u4\tview\tgalaxy")
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy")
 
     val InFilenameM1 = "m1.tsv"
     val InDirM1 = TmpDir + "data/"
@@ -396,7 +397,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices"
 
     // this creates one part-0000 file in the directory
-    mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM1)
+    mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle = true).saveAsTextFile(InDirM1)
 
     // to change from using part files to a single .tsv file we'll need to use HDFS
     val fs = FileSystem.get(new Configuration())
@@ -404,7 +405,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     fs.rename(new Path(InDirM1 + "part-00000"), new Path(InPathM1))
 
     // this creates one part-0000 file in the directory
-    mahoutCtx.parallelize(M2Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM2)
+    mahoutCtx.parallelize(M2Lines).coalesce(1, shuffle = true).saveAsTextFile(InDirM2)
 
     // to change from using part files to a single .tsv file we'll need to use HDFS
     //rename part-00000 to tmp/some-location/something.tsv
@@ -413,52 +414,52 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     // local multi-threaded Spark with default FS, suitable for build tests but need better location for data
 
     ItemSimilarityDriver.main(Array(
-        "--input", InPathStart,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", "\t",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1",
-        "--filenamePattern", "m..tsv",
-        "--recursive"))
+      "--input", InPathStart,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "\t",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--filenamePattern", "m..tsv",
+      "--recursive"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver, two input paths"){
+  test("ItemSimilarityDriver, two input paths") {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1,purchase,iphone",
-        "u1,purchase,ipad",
-        "u2,purchase,nexus",
-        "u2,purchase,galaxy",
-        "u3,purchase,surface",
-        "u4,purchase,iphone",
-        "u4,purchase,galaxy",
-        "u1,view,iphone",
-        "u1,view,ipad",
-        "u1,view,nexus",
-        "u1,view,galaxy",
-        "u2,view,iphone",
-        "u2,view,ipad",
-        "u2,view,nexus",
-        "u2,view,galaxy",
-        "u3,view,surface",
-        "u3,view,nexus",
-        "u4,view,iphone",
-        "u4,view,ipad",
-        "u4,view,galaxy")
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -467,65 +468,65 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile1,
-        "--input2", InFile2,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", ",",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1"))
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver, two inputs of different dimensions"){
+  test("ItemSimilarityDriver, two inputs of different dimensions") {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1,purchase,iphone",
-        "u1,purchase,ipad",
-        "u2,purchase,nexus",
-        "u2,purchase,galaxy",
-        // remove one user so A'B will be of different dimensions
-        // ItemSimilarityDriver should create one unified user dictionary and so account for this
-        // discrepancy as a blank row: "u3,purchase,surface",
-        "u4,purchase,iphone",
-        "u4,purchase,galaxy",
-        "u1,view,iphone",
-        "u1,view,ipad",
-        "u1,view,nexus",
-        "u1,view,galaxy",
-        "u2,view,iphone",
-        "u2,view,ipad",
-        "u2,view,nexus",
-        "u2,view,galaxy",
-        "u3,view,surface",
-        "u3,view,nexus",
-        "u4,view,iphone",
-        "u4,view,ipad",
-        "u4,view,galaxy")
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      // remove one user so A'B will be of different dimensions
+      // ItemSimilarityDriver should create one unified user dictionary and so account for this
+      // discrepancy as a blank row: "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy")
 
     val UnequalDimensionsSelfSimilarity = tokenize(Iterable(
-        "ipad\tiphone:1.7260924347106847",
-        "iphone\tipad:1.7260924347106847",
-        "nexus\tgalaxy:1.7260924347106847",
-        "galaxy\tnexus:1.7260924347106847"))
+      "ipad\tiphone:1.7260924347106847",
+      "iphone\tipad:1.7260924347106847",
+      "nexus\tgalaxy:1.7260924347106847",
+      "galaxy\tnexus:1.7260924347106847"))
 
     val UnequalDimensionsCrossSimilarity = tokenize(Iterable(
-        "galaxy\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 nexus:1.7260924347106847",
-        "iphone\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 surface:1.7260924347106847 nexus:1.7260924347106847",
-        "ipad\tgalaxy:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897",
-        "nexus\tiphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897 galaxy:0.6795961471815897"))
+      "galaxy\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 nexus:1.7260924347106847",
+      "iphone\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 surface:1.7260924347106847 nexus:1.7260924347106847",
+      "ipad\tgalaxy:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897",
+      "nexus\tiphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897 galaxy:0.6795961471815897"))
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
@@ -533,25 +534,25 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile1,
-        "--input2", InFile2,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", ",",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1"))
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
     tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
 
   }
 
-  test("ItemSimilarityDriver cross similarity two separate items spaces"){
+  test("ItemSimilarityDriver cross similarity two separate items spaces") {
     /* cross-similarity with category views, same user space
             	phones	tablets	mobile_acc	soap
           u1	0	      1	      1	          0
@@ -564,29 +565,29 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1,purchase,iphone",
-        "u1,purchase,ipad",
-        "u2,purchase,nexus",
-        "u2,purchase,galaxy",
-        "u3,purchase,surface",
-        "u4,purchase,iphone",
-        "u4,purchase,galaxy",
-        "u1,view,phones",
-        "u1,view,mobile_acc",
-        "u2,view,phones",
-        "u2,view,tablets",
-        "u2,view,mobile_acc",
-        "u3,view,mobile_acc",
-        "u4,view,phones",
-        "u4,view,tablets",
-        "u4,view,soap")
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,phones",
+      "u1,view,mobile_acc",
+      "u2,view,phones",
+      "u2,view,tablets",
+      "u2,view,mobile_acc",
+      "u3,view,mobile_acc",
+      "u4,view,phones",
+      "u4,view,tablets",
+      "u4,view,soap")
 
     val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
-        "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
-        "surface\tmobile_acc:0.6795961471815897",
-        "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
-        "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
-        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
+      "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
+      "surface\tmobile_acc:0.6795961471815897",
+      "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
+      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
+      "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -595,39 +596,39 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile1,
-        "--input2", InFile2,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", ",",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1",
-        "--writeAllDatasets"))
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
 
   }
 
-  test("A.t %*% B after changing row cardinality of A"){
+  test("A.t %*% B after changing row cardinality of A") {
     // todo: move to math tests but this is Spark specific
 
     val a = dense(
-        (1.0, 1.0))
+      (1.0, 1.0))
 
     val b = dense(
-        (1.0, 1.0),
-        (1.0, 1.0),
-        (1.0, 1.0))
+      (1.0, 1.0),
+      (1.0, 1.0),
+      (1.0, 1.0))
 
     val inCoreABiggertBAnswer = dense(
-        (1.0, 1.0),
-        (1.0, 1.0))
+      (1.0, 1.0),
+      (1.0, 1.0))
 
     val drmA = drmParallelize(m = a, numPartitions = 2)
     val drmB = drmParallelize(m = b, numPartitions = 2)
@@ -645,7 +646,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
     val bp = 0
   }
 
-  test("ItemSimilarityDriver cross similarity two separate items spaces, missing rows in B"){
+  test("ItemSimilarityDriver cross similarity two separate items spaces, missing rows in B") {
     /* cross-similarity with category views, same user space
             	phones	tablets	mobile_acc	soap
             u1	0	      1	      1	          0
@@ -658,29 +659,29 @@ removed ==> u3	0	      0	      1	          0
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-        "u1,purchase,iphone",
-        "u1,purchase,ipad",
-        "u2,purchase,nexus",
-        "u2,purchase,galaxy",
-        "u3,purchase,surface",
-        "u4,purchase,iphone",
-        "u4,purchase,galaxy",
-        "u1,view,phones",
-        "u1,view,mobile_acc",
-        "u2,view,phones",
-        "u2,view,tablets",
-        "u2,view,mobile_acc",
-        //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
-        "u4,view,phones",
-        "u4,view,tablets",
-        "u4,view,soap")
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,phones",
+      "u1,view,mobile_acc",
+      "u2,view,phones",
+      "u2,view,tablets",
+      "u2,view,mobile_acc",
+      //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
+      "u4,view,phones",
+      "u4,view,tablets",
+      "u4,view,soap")
 
     val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
-        "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847",
-        "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897",
-        "surface",
-        "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897",
-        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847"))
+      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847",
+      "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897",
+      "surface",
+      "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897",
+      "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
@@ -689,20 +690,20 @@ removed ==> u3	0	      0	      1	          0
 
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-        "--input", InFile1,
-        "--input2", InFile2,
-        "--output", OutPath,
-        "--master", masterUrl,
-        "--filter1", "purchase",
-        "--filter2", "view",
-        "--inDelim", ",",
-        "--itemIDPosition", "2",
-        "--rowIDPosition", "0",
-        "--filterPosition", "1",
-        "--writeAllDatasets"))
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable
     tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
   }
@@ -711,7 +712,7 @@ removed ==> u3	0	      0	      1	          0
   def tokenize(a: Iterable[String]): Iterable[String] = {
     var r: Iterable[String] = Iterable()
     a.foreach { l =>
-      l.split("\t").foreach{ s =>
+      l.split("\t").foreach { s =>
         r = r ++ s.split("[\t ]")
       }
     }
@@ -722,5 +723,5 @@ removed ==> u3	0	      0	      1	          0
     super.beforeAll(configMap)
     ItemSimilarityDriver.useContext(mahoutCtx)
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
new file mode 100644
index 0000000..562e8c6
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+import org.scalatest.{ConfigMap, FunSuite}
+
+
+class RowSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
+
+  val TextDocs = Array(
+    "doc1\tNow is the time for all good people to come to aid of their party",
+    "doc2\tNow is the time for all good people to come to aid of their country",
+    "doc3\tNow is the time for all good people to come to aid of their hood",
+    "doc4\tNow is the time for all good people to come to aid of their friends",
+    "doc5\tNow is the time for all good people to come to aid of their looser brother",
+    "doc6\tThe quick brown fox jumped over the lazy dog",
+    "doc7\tThe quick brown fox jumped over the lazy boy",
+    "doc8\tThe quick brown fox jumped over the lazy cat",
+    "doc9\tThe quick brown fox jumped over the lazy wolverine",
+    "doc10\tThe quick brown fox jumped over the lazy cantelope")// yes that's spelled correctly.
+
+  test("RowSimilarityDriver text docs no strengths") {
+
+    val firstFiveSimDocsTokens = tokenize(Iterable(
+      "doc1\tdoc3 doc2 doc4 doc5"))
+
+    val lastFiveSimDocsTokens = tokenize(Iterable(
+      "doc6\tdoc8 doc10 doc7 doc9"))
+
+    val inDir = TmpDir + "in-dir/"
+    val inFilename = "in-file.tsv"
+    val inPath = inDir + inFilename
+
+    val outPath = TmpDir + "similarity-matrices/"
+
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(TextDocs).coalesce(1, shuffle=true).saveAsTextFile(inDir)
+
+    // to change from using part files to a single .tsv file we'll need to use HDFS
+    val fs = FileSystem.get(new Configuration())
+    //rename part-00000 to something.tsv
+    fs.rename(new Path(inDir + "part-00000"), new Path(inPath))
+
+    // local multi-threaded Spark with default HDFS
+    RowSimilarityDriver.main(Array(
+      "--input", inPath,
+      "--output", outPath,
+      "--omitStrength",
+      "--maxSimilaritiesPerRow", "4", // would get all docs similar if we didn't limit them
+      "--master", masterUrl))
+
+    val simLines = mahoutCtx.textFile(outPath).collect
+    for (rowNum <- 0 to 4){
+      simLines(rowNum).split("[\t ]") should contain theSameElementsAs firstFiveSimDocsTokens
+    }
+    for (rowNum <- 5 to 9){
+      simLines(rowNum).split("[\t ]") should contain theSameElementsAs lastFiveSimDocsTokens
+    }
+
+  }
+
+  test("RowSimilarityDriver text docs") {
+
+    val simDocsTokens = tokenize(Iterable(
+      "doc1\tdoc3:27.87301122947484 doc2:27.87301122947484 doc4:27.87301122947484 doc5:23.42278065550721",
+      "doc2\tdoc4:27.87301122947484 doc3:27.87301122947484 doc1:27.87301122947484 doc5:23.42278065550721",
+      "doc3\tdoc4:27.87301122947484 doc2:27.87301122947484 doc1:27.87301122947484 doc5:23.42278065550721",
+      "doc4\tdoc3:27.87301122947484 doc2:27.87301122947484 doc1:27.87301122947484 doc5:23.42278065550721",
+      "doc5\tdoc4:23.42278065550721 doc2:23.42278065550721 doc3:23.42278065550721 doc1:23.42278065550721",
+      "doc6\tdoc8:22.936393049704463 doc10:22.936393049704463 doc7:22.936393049704463 doc9:22.936393049704463",
+      "doc7\tdoc6:22.936393049704463 doc8:22.936393049704463 doc10:22.936393049704463 doc9:22.936393049704463",
+      "doc8\tdoc6:22.936393049704463 doc10:22.936393049704463 doc7:22.936393049704463 doc9:22.936393049704463",
+      "doc9\tdoc6:22.936393049704463 doc8:22.936393049704463 doc10:22.936393049704463 doc7:22.936393049704463",
+      "doc10\tdoc6:22.936393049704463 doc8:22.936393049704463 doc7:22.936393049704463 doc9:22.936393049704463"))
+
+    val inDir = TmpDir + "in-dir/"
+    val inFilename = "in-file.tsv"
+    val inPath = inDir + inFilename
+
+    val outPath = TmpDir + "similarity-matrix/"
+
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(TextDocs).coalesce(1, shuffle=true).saveAsTextFile(inDir)
+
+    // to change from using part files to a single .tsv file we'll need to use HDFS
+    val fs = FileSystem.get(new Configuration())
+    //rename part-00000 to something.tsv
+    fs.rename(new Path(inDir + "part-00000"), new Path(inPath))
+
+    // local multi-threaded Spark with default HDFS
+    RowSimilarityDriver.main(Array(
+      "--input", inPath,
+      "--output", outPath,
+      "--maxSimilaritiesPerRow", "4", // would get all docs similar if we didn't limit them
+      "--master", masterUrl))
+
+    val simLines = mahoutCtx.textFile(outPath).collect
+    tokenize(simLines) should contain theSameElementsAs simDocsTokens
+  }
+
+  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
+  def tokenize(a: Iterable[String], splitString: String = "[\t ]"): Iterable[String] = {
+    var r: Iterable[String] = Iterable()
+    a.foreach ( l => r = r ++ l.split(splitString) )
+    r
+  }
+
+  override protected def beforeAll(configMap: ConfigMap) {
+    super.beforeAll(configMap)
+    RowSimilarityDriver.useContext(mahoutCtx)
+  }
+
+}


[2/2] git commit: MAHOUT-1604 add a CLI and associated code for spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569, closes apache/mahout#47

Posted by pa...@apache.org.
MAHOUT-1604 add a CLI and associated code for spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569, closes apache/mahout#47


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/149c9859
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/149c9859
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/149c9859

Branch: refs/heads/master
Commit: 149c98592fe447c98dfb5afc67b5809725cc3056
Parents: 91f15ec
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Aug 28 10:45:13 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Aug 28 10:45:13 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 bin/mahout                                      |  10 +-
 .../mahout/math/cf/CooccurrenceAnalysis.scala   | 220 ------
 .../mahout/math/cf/SimilarityAnalysis.scala     | 261 +++++++
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  11 +-
 .../mahout/math/scalabindings/MatrixOps.scala   |   3 +
 .../math/scalabindings/MatrixOpsSuite.scala     |   4 +-
 spark/pom.xml                                   |  21 +
 .../apache/mahout/drivers/FileSysUtils.scala    |  55 +-
 .../apache/mahout/drivers/IndexedDataset.scala  |  11 +-
 .../mahout/drivers/ItemSimilarityDriver.scala   | 117 ++-
 .../apache/mahout/drivers/MahoutDriver.scala    |  62 +-
 .../mahout/drivers/MahoutOptionParser.scala     |  42 +-
 .../apache/mahout/drivers/ReaderWriter.scala    |  21 +-
 .../mahout/drivers/RowSimilarityDriver.scala    | 159 +++++
 .../org/apache/mahout/drivers/Schema.scala      |  36 +-
 .../drivers/TextDelimitedReaderWriter.scala     | 134 +++-
 .../mahout/sparkbindings/SparkEngine.scala      |   2 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   |  29 +-
 .../drivers/ItemSimilarityDriverSuite.scala     | 713 ++++++++++---------
 .../drivers/RowSimilarityDriverSuite.scala      | 138 ++++
 21 files changed, 1275 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 47518b4..dfccd95 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1604: Spark version of rowsimilarity driver and associated additions to SimilarityAnalysis.scala (pferrel)
+
   MAHOUT-1500: H2O Integration (Anand Avati via apalumbo)
 
   MAHOUT-1606 - Add rowSums, rowMeans and diagonal extraction operations to distributed matrices (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index 27acd9f..c22118b 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -88,6 +88,10 @@ if [ "$1" == "spark-itemsimilarity" ]; then
   SPARK=1
 fi
 
+if [ "$1" == "spark-rowsimilarity" ]; then
+  SPARK=1
+fi
+
 if [ "$MAHOUT_CORE" != "" ]; then
   IS_CORE=1
 fi
@@ -179,7 +183,7 @@ then
     done
   fi
 
-  # add spark-shell -- if we requested shell or other spark CLI driver
+  # add jars for running from the command line if we requested shell or spark CLI driver
   if [ "$SPARK" == "1" ]; then
 
     for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do
@@ -254,6 +258,10 @@ case "$1" in
     shift
     "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.ItemSimilarityDriver" "$@"
     ;;
+  (spark-rowsimilarity)
+    shift
+    "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.RowSimilarityDriver" "$@"
+    ;;
   (h2o-node)
     shift
     "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "water.H2O" -md5skip "$@" -name mah2out

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
deleted file mode 100644
index 181b729..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.cf
-
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-
-/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object CooccurrenceAnalysis extends Serializable {
-
-  /** Compares (Int,Double) pairs by the second value */
-  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
-  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
-                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
-
-    implicit val distributedContext = drmARaw.context
-
-    // Apply selective downsampling, pin resulting matrix
-    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
-    // num users, which equals the maximum number of interactions per item
-    val numUsers = drmA.nrow.toInt
-
-    // Compute & broadcast the number of interactions per thing in A
-    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
-    // Compute co-occurrence matrix A'A
-    val drmAtA = drmA.t %*% drmA
-
-    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
-    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
-
-    var indicatorMatrices = List(drmIndicatorsAtA)
-
-    // Now look at cross-co-occurrences
-    for (drmBRaw <- drmBs) {
-      // Down-sample and pin other interaction matrix
-      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
-      // Compute & broadcast the number of interactions per thing in B
-      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
-      // Compute cross-co-occurrence matrix B'A
-      // pferrel: yikes, this is the wrong order, a big change! so you know who to blame
-      // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order
-      val drmAtB = drmA.t %*% drmB
-
-      val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing,
-        bcastInteractionsPerItemA, bcastInteractionsPerThingB)
-
-      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
-
-      drmB.uncache()
-    }
-
-    // Unpin downsampled interaction matrix
-    drmA.uncache()
-
-    // Return list of indicator matrices
-    indicatorMatrices
-  }
-
-  /**
-   * Compute loglikelihood ratio
-   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-   **/
-  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
-                         numInteractionsWithAandB: Long, numInteractions: Long) = {
-
-    val k11 = numInteractionsWithAandB
-    val k12 = numInteractionsWithA - numInteractionsWithAandB
-    val k21 = numInteractionsWithB - numInteractionsWithAandB
-    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
-    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
-  }
-
-  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
-                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
-                        crossCooccurrence: Boolean = true) = {
-    drmBtA.mapBlock() {
-      case (keys, block) =>
-
-        val llrBlock = block.like()
-        val numInteractionsB: Vector = bcastNumInteractionsB
-        val numInteractionsA: Vector = bcastNumInteractionsA
-
-        for (index <- 0 until keys.size) {
-
-          val thingB = keys(index)
-
-          // PriorityQueue to select the top-k items
-          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
-          block(index, ::).nonZeroes().foreach { elem =>
-            val thingA = elem.index
-            val cooccurrences = elem.get
-
-            // exclude co-occurrences of the item with itself
-            if (crossCooccurrence || thingB != thingA) {
-              // Compute loglikelihood ratio
-              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
-                cooccurrences.toLong, numUsers)
-
-              val candidate = thingA -> llr
-
-              // matches legacy hadoop code and maps values to range (0..1)
-              // val tLLR = 1.0 - (1.0 / (1.0 + llr))
-              //val candidate = thingA -> tLLR
-
-              // Enqueue item with score, if belonging to the top-k
-              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
-                topItemsPerThing.enqueue(candidate)
-              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
-                topItemsPerThing.dequeue()
-                topItemsPerThing.enqueue(candidate)
-              }
-            }
-          }
-
-          // Add top-k interesting items to the output matrix
-          topItemsPerThing.dequeueAll.foreach {
-            case (otherThing, llrScore) =>
-              llrBlock(index, otherThing) = llrScore
-          }
-        }
-
-        keys -> llrBlock
-    }
-  }
-
-  /**
-   * Selectively downsample users and things with an anomalous amount of interactions, inspired by
-   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
-   *
-   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
-   */
-  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
-    implicit val distributedContext = drmM.context
-
-    // Pin raw interaction matrix
-    val drmI = drmM.checkpoint()
-
-    // Broadcast vector containing the number of interactions with each thing
-    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
-    val downSampledDrmI = drmI.mapBlock() {
-      case (keys, block) =>
-        val numInteractions: Vector = bcastNumInteractions
-
-        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
-        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
-
-        val downsampledBlock = block.like()
-
-        // Downsample the interaction vector of each user
-        for (userIndex <- 0 until keys.size) {
-
-          val interactionsOfUser = block(userIndex, ::)
-
-          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
-
-          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
-
-          interactionsOfUser.nonZeroes().foreach { elem =>
-            val numInteractionsWithThing = numInteractions(elem.index)
-            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
-            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
-              // We ignore the original interaction value and create a binary 0-1 matrix
-              // as we only consider whether interactions happened or did not happen
-              downsampledBlock(userIndex, elem.index) = 1
-            }
-          }
-        }
-
-        keys -> downsampledBlock
-    }
-
-    // Unpin raw interaction matrix
-    drmI.uncache()
-
-    downSampledDrmI
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
new file mode 100644
index 0000000..90d7559
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object SimilarityAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+  /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+    * and returns a list of indicator and cross-indicator matrices
+    * @param drmARaw Primary interaction matrix
+    * @param randomSeed when kept to a constant will make repeatable downsampling
+    * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
+    * @param maxNumInteractions max number of interactions after downsampling, default: 500
+    * @return
+    * */
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+    val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+      bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    var indicatorMatrices = List(drmIndicatorsAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+      // Compute cross-co-occurrence matrix A'B
+      val drmAtB = drmA.t %*% drmB
+
+      val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
+        bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of indicator matrices
+    indicatorMatrices
+  }
+
+  /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
+    * @param drmARaw Primary interaction matrix
+    * @param randomSeed when kept to a constant will make repeatable downsampling
+    * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
+    * @param maxNumInteractions max number of interactions after downsampling, default: 500
+    * @return
+    * */
+  def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
+                    maxNumInteractions: Int = 500): DrmLike[Int] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num columns, which equals the maximum number of interactions per item
+    val numCols = drmA.ncol
+
+    // Compute & broadcast the number of interactions per row in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow)
+
+    // Compute row similarity cooccurrence matrix AA'
+    val drmAAt = drmA %*% drmA.t
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
+    val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA,
+      bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    drmSimilaritiesAAt
+  }
+
+  /**
+   * Compute loglikelihood ratio
+   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+   **/
+  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+    numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+  }
+
+  def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+                        crossCooccurrence: Boolean = true) = {
+    drm.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+                cooccurrences.toLong, numUsers)
+
+              val candidate = thingA -> llr
+
+              // matches legacy hadoop code and maps values to range (0..1)
+              // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
+              // val candidate = thingA -> normailizedLLR
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach {
+            case (otherThing, llrScore) =>
+              llrBlock(index, otherThing) = llrScore
+          }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample rows and items with an anomalous amount of interactions, inspired by
+   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+   * @param drmM matrix to downsample
+   * @param seed random number generator seed, keep to a constant if repeatability is neccessary
+   * @param maxNumInteractions number of elements in a row of the returned matrix
+   * @return
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each row
+        for (rowIndex <- 0 until keys.size) {
+
+          val interactionsInRow = block(rowIndex, ::)
+
+          val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements()
+
+          val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow
+
+          interactionsInRow.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) {
+              // We ignore the original interaction value and create a binary 0-1 matrix
+              // as we only consider whether interactions happened or did not happen
+              downsampledBlock(rowIndex, elem.index) = 1
+            }
+          }
+        }
+
+        keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 026ab75..d8d04e2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -90,7 +90,16 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
       // Collect block-wise rowsums and output them as one-column matrix.
       keys -> dense(block.rowSums).t
     }
-        .collect(::, 0)
+      .collect(::, 0)
+  }
+
+  /** Counts the non-zeros elements in each row returning a vector of the counts */
+  def numNonZeroElementsPerRow(): Vector = {
+    drm.mapBlock(ncol = 1) { case (keys, block) =>
+      // Collect block-wise row non-zero counts and output them as a one-column matrix.
+      keys -> dense(block.numNonZeroElementsPerRow).t
+    }
+      .collect(::, 0)
   }
 
   /** Row means */

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index d5ac026..910035f 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -191,7 +191,10 @@ class MatrixOps(val m: Matrix) {
   /* Diagonal assignment */
   def diagv_=(that: Double) = diagv := that
 
+  /* Row and Column non-zero element counts */
   def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountNonZeroElementsFunc)
+
+  def numNonZeroElementsPerRow() = m.aggregateRows(vectorCountNonZeroElementsFunc)
 }
 
 object MatrixOps {

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index 5be6ca8..d7b22d9 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -120,10 +120,11 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     a.colMeans() should equal(dvec(2.5, 3.5, 4.5))
     a.rowMeans() should equal(dvec(3, 4))
     a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2))
+    a.numNonZeroElementsPerRow() should equal(dvec(3,3))
 
   }
 
-  test("numNonZeroElementsPerColumn") {
+  test("numNonZeroElementsPerColumn and Row") {
     val a = dense(
       (2, 3, 4),
       (3, 4, 5),
@@ -132,6 +133,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     )
 
     a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4))
+    a.numNonZeroElementsPerRow() should equal(dvec(3,3,2,1))
   }
 
   test("Vector Assignment performance") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 71d3944..2f79377 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -157,6 +157,27 @@
         </executions>
       </plugin>
 
+      <!-- create job jar to include CLI driver deps-->
+      <!-- leave this in even though there are no hadoop mapreduce jobs in this module -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>job</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/job.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
index 654f116..f48e9ed 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
@@ -21,25 +21,24 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
 
 /**
-  * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
-  * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
-  *
-  * @param pathURI Where to start looking for inFiles, only HDFS is currently
-  *                supported. The pathURI may be a list of comma delimited URIs like those supported
-  *                by Spark
-  * @param filePattern regex that must match the entire filename to have the file returned
-  * @param recursive true traverses the filesystem recursively
-  */
+ * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
 
-case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: Boolean = false) {
+case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
 
   val conf = new Configuration()
   val fs = FileSystem.get(conf)
 
-  /** returns a string of comma delimited URIs matching the filePattern */
+/** Returns a string of comma delimited URIs matching the filePattern
+  * When pattern matching dirs are never returned, only traversed. */
   def uris :String = {
-    if(recursive){
-      val pathURIs = pathURI.split(",")
+    if (!filePattern.isEmpty){ // have file pattern so
+    val pathURIs = pathURI.split(",")
       var files = ""
       for ( uri <- pathURIs ){
         files = findFiles(uri, filePattern, files)
@@ -51,21 +50,27 @@ case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive:
     }
   }
 
-  /** find matching files in the dir, recursively call self when another directory is found */
-  def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
-    val fileStatuses: Array[FileStatus] = fs.listStatus (new Path(dir))
+/** Find matching files in the dir, recursively call self when another directory is found
+  * Only files are matched, directories are traversed but never return a match */
+  private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
+    val seed = fs.getFileStatus(new Path(dir))
     var f :String = files
-    for (fileStatus <- fileStatuses ){
-      if (fileStatus.getPath().getName().matches(filePattern)
-        && !fileStatus.isDir){// found a file
-        if (fileStatus.getLen() != 0) {
-          // file is not empty
-          f = f + fileStatus.getPath.toUri.toString + ","
+
+    if (seed.isDir) {
+      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+      for (fileStatus <- fileStatuses) {
+        if (fileStatus.getPath().getName().matches(filePattern)
+          && !fileStatus.isDir) {
+          // found a file
+          if (fileStatus.getLen() != 0) {
+            // file is not empty
+            f = f + fileStatus.getPath.toUri.toString + ","
+          }
+        } else if (fileStatus.isDir && recursive) {
+          f = findFiles(fileStatus.getPath.toString, filePattern, f)
         }
-      }else if (fileStatus.isDir){
-        f = findFiles(fileStatus.getPath.toString, filePattern, f)
       }
-    }
+    }else{ f = dir }// was a filename not dir
     f
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
index 41622a8..99f98f5 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.drivers
 
 import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
 import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 import org.apache.mahout.sparkbindings._
 
@@ -61,13 +61,16 @@ case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String
     val newMatrix = drmWrap[Int](drmRdd, n, ncol)
     new IndexedDataset(newMatrix, rowIDs, columnIDs)
   }
+
 }
 
 /**
-  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
-  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
+  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
+  * constructor for
+  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
+  * [[org.apache.mahout.drivers.Reader]]
   * {{{
-  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source))
+  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
   * }}}
   */
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 460106f..b05b55d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.SimilarityAnalysis
 import scala.collection.immutable.HashMap
 
 /**
@@ -25,7 +25,7 @@ import scala.collection.immutable.HashMap
  * Reads text lines
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
- * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
  * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
@@ -34,7 +34,7 @@ import scala.collection.immutable.HashMap
  * The options allow flexible control of the input schema, file discovery, output schema, and control of
  * algorithm parameters.
  * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
- * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
+ * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
  * you can specify only the input and output file and directory--all else will default to the correct values.
  * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
@@ -47,10 +47,6 @@ object ItemSimilarityDriver extends MahoutDriver {
     "maxSimilaritiesPerItem" -> 100,
     "appName" -> "ItemSimilarityDriver")
 
-  // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so the can override and previous options!
-  private var options: Map[String, Any] = null
-
   private var reader1: TextDelimitedIndexedDatasetReader = _
   private var reader2: TextDelimitedIndexedDatasetReader = _
   private var writer: TextDelimitedIndexedDatasetWriter = _
@@ -60,17 +56,15 @@ object ItemSimilarityDriver extends MahoutDriver {
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
-    options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++
-      MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++
-      MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions
 
-    val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
-      head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+    parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+      head("spark-itemsimilarity", "Mahout 1.0")
 
       //Input output options, non-driver specific
-      parseIOOptions
+      parseIOOptions(numInputs = 2)
 
       //Algorithm control options--driver specific
+      opts = opts ++ ItemSimilarityOptions
       note("\nAlgorithm control options:")
       opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
         options + ("maxPrefs" -> x)
@@ -79,14 +73,11 @@ object ItemSimilarityDriver extends MahoutDriver {
         if (x > 0) success else failure("Option --maxPrefs must be > 0")
       }
 
-/** not implemented in CooccurrenceAnalysis.cooccurrence
-      opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
-        options.put("minPrefs", x)
-        options
-      } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
-        if (x > 0) success else failure("Option --minPrefs must be > 0")
-      }
-*/
+      /** not implemented in SimilarityAnalysis.cooccurrence
+        * threshold, and minPrefs
+        * todo: replacing the threshold with some % of the best values and/or a
+        * confidence measure expressed in standard deviations would be nice.
+        */
 
       opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
         options + ("maxSimilaritiesPerItem" -> x)
@@ -99,7 +90,7 @@ object ItemSimilarityDriver extends MahoutDriver {
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
       //Input text format
-      parseInputSchemaOptions
+      parseElementInputSchemaOptions
 
       //How to search for input
       parseFileDiscoveryOptions
@@ -116,14 +107,14 @@ object ItemSimilarityDriver extends MahoutDriver {
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
-    parser.parse(args, options) map { opts =>
-      options = opts
+    parser.parse(args, parser.opts) map { opts =>
+      parser.opts = opts
       process
     }
   }
 
-  override def start(masterUrl: String = options("master").asInstanceOf[String],
-      appName: String = options("appName").asInstanceOf[String]):
+  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
+      appName: String = parser.opts("appName").asInstanceOf[String]):
     Unit = {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
@@ -131,31 +122,31 @@ object ItemSimilarityDriver extends MahoutDriver {
     // will be only spcific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
       .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
+      .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
 
     super.start(masterUrl, appName)
 
-    val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
-        "filter" -> options("filter1").asInstanceOf[String],
-        "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int],
-        "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int],
-        "filterPosition" -> options("filterPosition").asInstanceOf[Int])
+    val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+      "filter" -> parser.opts("filter1").asInstanceOf[String],
+      "rowIDPosition" -> parser.opts("rowIDPosition").asInstanceOf[Int],
+      "columnIDPosition" -> parser.opts("itemIDPosition").asInstanceOf[Int],
+      "filterPosition" -> parser.opts("filterPosition").asInstanceOf[Int])
 
     reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
 
-    if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null)
-        || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){
+    if ((parser.opts("filterPosition").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
+      || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
       // only need to change the filter used compared to readSchema1
-      val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String])
+      val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
 
       reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
     }
 
     writeSchema = new Schema(
-        "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String],
-        "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String],
-        "omitScore" -> options("omitStrength").asInstanceOf[Boolean],
-        "tupleDelim" -> options("tupleDelim").asInstanceOf[String])
+      "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
+      "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
+      "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
+      "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
 
     writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
 
@@ -163,19 +154,19 @@ object ItemSimilarityDriver extends MahoutDriver {
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
-        options("recursive").asInstanceOf[Boolean]).uris
-    val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) ""
-      else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
-          options("recursive").asInstanceOf[Boolean]).uris
+    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+      parser.opts("recursive").asInstanceOf[Boolean]).uris
+    val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
+    else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+      parser.opts("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       Array()
     } else {
 
-      val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles))
-      if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA,
-          options("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+      val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
+      if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
+        parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
 
       // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
       // Here we assume there is one row ID space for all interactions. To do this we calculate the
@@ -185,15 +176,15 @@ object ItemSimilarityDriver extends MahoutDriver {
       // be supported (and are at least on Spark) or the row cardinality fix will not work.
       val datasetB = if (!inFiles2.isEmpty) {
         // get cross-cooccurrence interactions from separate files
-        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
 
         datasetB
 
-      } else if (options("filterPosition").asInstanceOf[Int] != -1
-          && options("filter2").asInstanceOf[String] != null) {
+      } else if (parser.opts("filterPosition").asInstanceOf[Int] != -1
+        && parser.opts("filter2").asInstanceOf[String] != null) {
 
         // get cross-cooccurrences interactions by using two filters on a single set of files
-        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+        val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
 
         datasetB
 
@@ -201,18 +192,18 @@ object ItemSimilarityDriver extends MahoutDriver {
         null.asInstanceOf[IndexedDataset]
       }
       if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
-        // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
-        val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
+      // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
+      val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
 
         // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on
         // its calculation
         val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality)
-          else datasetA // this guarantees matching cardinality
+        else datasetA // this guarantees matching cardinality
 
         val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
-          else datasetB // this guarantees matching cardinality
+        else datasetB // this guarantees matching cardinality
 
-        if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions")
+        if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
 
         Array(returnedA, returnedB)
       } else Array(datasetA)
@@ -227,25 +218,25 @@ object ItemSimilarityDriver extends MahoutDriver {
     // todo: allow more than one cross-similarity matrix?
     val indicatorMatrices = {
       if (indexedDatasets.length > 1) {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
-            options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int],
-            Array(indexedDatasets(1).matrix))
+        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
+          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
+          Array(indexedDatasets(1).matrix))
       } else {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
-          options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int])
+        SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
+          parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
       }
     }
 
     // an alternative is to create a version of IndexedDataset that knows how to write itself
     val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
       indexedDatasets(0).columnIDs, writeSchema)
-    selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix")
+    selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
 
     // todo: would be nice to support more than one cross-similarity indicator
     if (indexedDatasets.length > 1) {
 
       val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
-      writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix")
+      writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
 
     }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index e92ed37..6ea7c8b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -26,43 +26,55 @@ import scala.collection.immutable
 /** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
   * Also define a Map of options for the command line parser. The following template may help:
   * {{{
-  *   object SomeDriver extends MahoutDriver {
-  *     // build options from some stardard CLI param groups
-  *     // Note: always put the driver specific options at the last so the can override and previous options!
-  *     private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++
-  *       TextDelimitedDRMOptions ++ ItemSimilarityOptions
+  * object SomeDriver extends MahoutDriver {
   *
-  *     override def main(args: Array[String]): Unit = {
-  *       val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
-  *         head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+  *   // define only the options specific to this driver, inherit the generic ones
+  *   private final val SomeOptions = HashMap[String, Any](
+  *       "maxThings" -> 500,
+  *       "minThings" -> 100,
+  *       "appName" -> "SomeDriver")
   *
-  *         //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods
-  *         parseGenericOptions
-  *         ...
-  *       }
-  *       parser.parse(args, options) map { opts =>
-  *         options = opts
-  *         process
-  *       }
-  *     }
+  *   override def main(args: Array[String]): Unit = {
+  *
+  *
+  *     val parser = new MahoutOptionParser(programName = "shortname") {
+  *       head("somedriver", "Mahout 1.0-SNAPSHOT")
   *
-  *     override def process: Unit = {
-  *       start()
-  *       //don't just stand there do something
-  *       stop
+  *       // Input output options, non-driver specific
+  *       parseIOOptions
+  *
+  *       // Algorithm specific options
+  *       // Add in the new options
+  *       opts = opts ++ SomeOptions
+  *       note("\nAlgorithm control options:")
+  *       opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+  *         options + ("maxThings" -> x) ...
+  *     }
+  *     parser.parse(args, parser.opts) map { opts =>
+  *       parser.opts = opts
+  *       process
   *     }
   *   }
+  *
+  *   override def process: Unit = {
+  *     start()
+  *     // do the work here
+  *     stop
+  *   }
+  *
   * }}}
   */
 abstract class MahoutDriver {
 
 
-  implicit var mc: DistributedContext = _
-  implicit var sparkConf = new SparkConf()
-  var _useExistingContext: Boolean = false
+  implicit protected var mc: DistributedContext = _
+  implicit protected var sparkConf = new SparkConf()
+  protected var parser: MahoutOptionParser = _
+
+  var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
 
   /** Creates a Spark context to run the job inside.
-    * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
+    * Override to set the SparkConf values specific to the job,
     * these must be set before the context is created.
     * @param masterUrl Spark master URL
     * @param appName  Name to display in Spark UI

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 3aada78..6908bd2 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -32,13 +32,15 @@ object MahoutOptionParser {
     "appName" -> "Generic Spark App, Change this.")
 
   final val FileIOOptions = immutable.HashMap[String, Any](
-    "recursive" -> false,
     "input" -> null.asInstanceOf[String],
     "input2" -> null.asInstanceOf[String],
-    "output" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String])
+
+  final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
     "filenamePattern" -> "^part-.*")
 
-  final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any](
+  final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
     "rowIDPosition" -> 0,
     "itemIDPosition" -> 1,
     "filterPosition" -> -1,
@@ -49,7 +51,7 @@ object MahoutOptionParser {
   final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
     "rowKeyDelim" -> "\t",
     "columnIdStrengthDelim" -> ":",
-    "tupleDelim" -> " ",
+    "elementDelim" -> " ",
     "omitStrength" -> false)
 }
 /** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
@@ -57,17 +59,25 @@ object MahoutOptionParser {
   * @param programName Name displayed in help message, the name by which the driver is invoked.
   * */
 class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so the can override and previous options!
+  var opts = Map.empty[String, Any]
+
   override def showUsageOnError = true
 
-  def parseIOOptions = {
+  def parseIOOptions(numInputs: Int = 1) = {
+    opts = opts ++ MahoutOptionParser.FileIOOptions
     note("Input, output options")
     opt[String]('i', "input") required() action { (x, options) =>
       options + ("input" -> x)
     } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
 
-    opt[String]("input2") abbr ("i2")  action { (x, options) =>
-      options + ("input2" -> x)
-    } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+    if (numInputs == 2) {
+      opt[String]("input2") abbr ("i2") action { (x, options) =>
+        options + ("input2" -> x)
+      } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+    }
 
     opt[String]('o', "output") required() action { (x, options) =>
       // todo: check to see if HDFS allows MS-Windows backslashes locally?
@@ -81,6 +91,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
   }
 
   def parseSparkOptions = {
+    opts = opts ++ MahoutOptionParser.SparkOptions
     note("\nSpark config options:")
 
     opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
@@ -94,7 +105,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
   }
 
   def parseGenericOptions = {
-    note("\nGeneral config options:")
+    opts = opts ++ MahoutOptionParser.GenericOptions
     opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
       options + ("randomSeed" -> x)
     } validate { x =>
@@ -107,9 +118,10 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     }//Hidden option, though a user might want this.
   }
 
-  def parseInputSchemaOptions{
-    //Input text file schema--not driver specific but input data specific, tuples input,
+  def parseElementInputSchemaOptions{
+    //Input text file schema--not driver specific but input data specific, elements input,
     // not drms
+    opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
     note("\nInput text file schema options:")
     opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
       options + ("inDelim" -> x)
@@ -162,6 +174,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
   def parseFileDiscoveryOptions = {
     //File finding strategy--not driver specific
+    opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
     note("\nFile discovery options:")
     opt[Unit]('r', "recursive") action { (_, options) =>
       options + ("recursive" -> true)
@@ -174,6 +187,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
   }
 
   def parseDrmFormatOptions = {
+    opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
     note("\nOutput text file schema options:")
     opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
       options + ("rowKeyDelim" -> x)
@@ -183,9 +197,9 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       options + ("columnIdStrengthDelim" -> x)
     } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
 
-    opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
-      options + ("tupleDelim" -> x)
-    } text ("Separates vector tuple values in the values list (optional). Default: \" \"")
+    opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+      options + ("elementDelim" -> x)
+    } text ("Separates vector element values in the values list (optional). Default: \" \"")
 
     opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
       options + ("omitStrength" -> true)

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
index e2bb49c..6351e45 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -20,7 +20,7 @@ package org.apache.mahout.drivers
 import com.google.common.collect.{HashBiMap, BiMap}
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read.
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
   * @tparam T type of object read.
   */
 trait Reader[T]{
@@ -28,16 +28,27 @@ trait Reader[T]{
   val mc: DistributedContext
   val readSchema: Schema
 
-  protected def tupleReader(
+  protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
       existingRowIDs: BiMap[String, Int]): T
 
-  def readTuplesFrom(
+  protected def drmReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  def readElementsFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    elementReader(mc, readSchema, source, existingRowIDs)
+
+  def readDRMFrom(
       source: String,
       existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
-    tupleReader(mc, readSchema, source, existingRowIDs)
+    drmReader(mc, readSchema, source, existingRowIDs)
 }
 
 /** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
@@ -51,5 +62,5 @@ trait Writer[T]{
 
   protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
 
-  def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
new file mode 100644
index 0000000..920c32b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -0,0 +1,159 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.cf.SimilarityAnalysis
+import scala.collection.immutable.HashMap
+
+/**
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]].
+ * Reads a text delimited file containing a Mahout DRM of the form
+ * (row id, column id: strength, ...). The IDs are user specified strings which will be
+ * preserved in the
+ * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]
+ * will be used to calculate row-wise similarity using log-likelihood
+ * The options allow control of the input schema, file discovery, output schema, and control of
+ * algorithm parameters.
+ * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default
+ * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....)
+ * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....)
+ * Each output line will contain a row ID and similar columns sorted by LLR strength descending.
+ * @note To use with a Spark cluster see the --master option, if you run out of heap space check
+ *       the --sparkExecutorMemory option.
+ */
+object RowSimilarityDriver extends MahoutDriver {
+  // define only the options specific to RowSimilarity
+  private final val RowSimilarityOptions = HashMap[String, Any](
+    "maxObservations" -> 500,
+    "maxSimilaritiesPerRow" -> 100,
+    "appName" -> "RowSimilarityDriver")
+
+  private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _
+  private var readWriteSchema: Schema = _
+
+  /**
+   * @param args  Command line args, if empty a help message is printed.
+   */
+  override def main(args: Array[String]): Unit = {
+
+    parser = new MahoutOptionParser(programName = "spark-rowsimilarity") {
+      head("spark-rowsimilarity", "Mahout 1.0")
+
+      //Input output options, non-driver specific
+      parseIOOptions()
+
+      //Algorithm control options--driver specific
+      opts = opts ++ RowSimilarityOptions
+
+      note("\nAlgorithm control options:")
+      opt[Int]("maxObservations") abbr ("mo") action { (x, options) =>
+        options + ("maxObservations" -> x)
+      } text ("Max number of observations to consider per row (optional). Default: " +
+        RowSimilarityOptions("maxObservations")) validate { x =>
+        if (x > 0) success else failure("Option --maxObservations must be > 0")
+      }
+
+      opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
+        options + ("maxSimilaritiesPerRow" -> x)
+      } text ("Limit the number of similarities per item to this number (optional). Default: " +
+        RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
+        if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0")
+      }
+
+      /** --threshold not implemented in SimilarityAnalysis.rowSimilarity
+        * todo: replacing the threshold with some % of the best values and/or a
+        * confidence measure expressed in standard deviations would be nice.
+        */
+
+      //Driver notes--driver specific
+      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
+
+      //Drm output schema--not driver specific, drm specific
+      parseDrmFormatOptions
+
+      //How to search for input
+      parseFileDiscoveryOptions
+
+      //Spark config options--not driver specific
+      parseSparkOptions
+
+      //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
+      parseGenericOptions
+
+      help("help") abbr ("h") text ("prints this usage text\n")
+
+    }
+    parser.parse(args, parser.opts) map { opts =>
+      parser.opts = opts
+      process
+    }
+  }
+
+  override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
+                     appName: String = parser.opts("appName").asInstanceOf[String]):
+  Unit = {
+
+    // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
+    // MahoutKryoRegistrator, it should be added to the register list here so it
+    // will be only spcific to this job.
+    sparkConf.set("spark.kryo.referenceTracking", "false")
+      .set("spark.kryoserializer.buffer.mb", "200")
+      .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+
+    super.start(masterUrl, appName)
+
+    readWriteSchema = new Schema(
+      "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String],
+      "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
+      "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
+      "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
+
+    readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema)
+
+  }
+
+  private def readIndexedDataset: IndexedDataset = {
+
+    val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+      parser.opts("recursive").asInstanceOf[Boolean]).uris
+
+    if (inFiles.isEmpty) {
+      null.asInstanceOf[IndexedDataset]
+    } else {
+
+      val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles))
+      datasetA
+    }
+  }
+
+  override def process: Unit = {
+    start()
+
+    val indexedDataset = readIndexedDataset
+
+    val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int],
+      parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int])
+
+    val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm,
+      indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema)
+    rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String])
+
+    stop
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index edff92d..42b2658 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -41,26 +41,26 @@ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
 // These can be used to keep the text in and out fairly standard to Mahout, where an application specific
 // format is not required.
 
-/** Simple default Schema for typical text delimited tuple file input
-  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID
+/** Simple default Schema for typical text delimited element file input
+  * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
   * <comma, tab, or space>here may be other ignored text...)
   */
-class DefaultTupleReadSchema extends Schema(
-    "delim" -> "[,\t ]", //comma, tab or space
-    "filter" -> "",
-    "rowIDPosition" -> 0,
-    "columnIDPosition" -> 1,
-    "filterPosition" -> -1)
+class DefaultElementReadSchema extends Schema(
+  "delim" -> "[,\t ]", //comma, tab or space
+  "filter" -> "",
+  "rowIDPosition" -> 0,
+  "columnIDPosition" -> 1,
+  "filterPosition" -> -1)
 
 /** Default Schema for text delimited drm file output
   * This tells the writer to write a DRM of the default form:
   * (rowID<tab>columnID1:score1<space>columnID2:score2...)
   */
 class DefaultDRMWriteSchema extends Schema(
-    "rowKeyDelim" -> "\t",
-    "columnIdStrengthDelim" -> ":",
-    "tupleDelim" -> " ",
-    "omitScore" -> false)
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "elementDelim" -> " ",
+  "omitScore" -> false)
 
 /** Default Schema for typical text delimited drm file input
   * This tells the reader to input text lines of the form:
@@ -69,9 +69,9 @@ class DefaultDRMWriteSchema extends Schema(
 class DefaultDRMReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> " ")
+  "elementDelim" -> " ")
 
-/** Default Schema for reading a text delimited drm file  where the score of any tuple is ignored,
+/** Default Schema for reading a text delimited drm file  where the score of any element is ignored,
   * all non-zeros are replaced with 1.
   * This tells the reader to input DRM lines of the form
   * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
@@ -82,17 +82,17 @@ class DefaultDRMReadSchema extends Schema(
 class DRMReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> " ",
+  "elementDelim" -> " ",
   "omitScore" -> true)
 
-/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted.
-  * The presence of a tuple means the score = 1, the absence means a score of 0.
+/** Default Schema for typical text delimited drm file write where the score of a element is omitted.
+  * The presence of a element means the score = 1, the absence means a score of 0.
   * This tells the writer to output DRM lines of the form
   * (rowID<tab>columnID1<space>columnID2...)
   */
 class DRMWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> " ",
+  "elementDelim" -> " ",
   "omitScore" -> true)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 11d647b..53a36a5 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -27,14 +27,16 @@ import scala.collection.JavaConversions._
 /** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
   */
 trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
-  /** Read in text delimited tuples from all URIs in this comma delimited source String.
+  /** Read in text delimited elements from all URIs in the comma delimited source String and return
+    * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+    * no strength value in the element, assume it's presence means a strength of 1.
     *
     * @param mc context for the Spark job
     * @param readSchema describes the delimiters and positions of values in the text delimited file.
     * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
     * @return
     */
-  protected def tupleReader(
+  protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
@@ -53,7 +55,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       })
 
       var columns = mc.textFile(source).map { line => line.split(delimiter) }
-      //val m = columns.collect
 
       // -1 means no filter in the input text, take them all
       if(filterPosition != -1) {
@@ -113,9 +114,100 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     }
   }
 
-  // this creates a BiMap from an ID collection. The ID points to an ordinal int
+  /** Read in text delimited rows from all URIs in this comma delimited source String and return
+    * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
+    * no strength value in the element, assume it's presence means a strength of 1.
+    *
+    * @param mc context for the Spark job
+    * @param readSchema describes the delimiters and positions of values in the text delimited file.
+    * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+    * @return
+    */
+  protected def drmReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+    try {
+      val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
+      val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String]
+      val elementDelim = readSchema("elementDelim").asInstanceOf[String]
+      // no need for omitScore since we can tell if there is a score and assume it is 1.0d if not specified
+      //val omitScore = readSchema("omitScore").asInstanceOf[Boolean]
+
+      assert(!source.isEmpty, {
+        println(this.getClass.toString + ": has no files to read")
+        throw new IllegalArgumentException
+      })
+
+      var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) }
+
+      // get row and column IDs
+      val interactions = rows.map { row =>
+        row(0) -> row(1)// rowID token -> string of column IDs+strengths
+      }
+
+      interactions.cache()
+      interactions.collect()
+
+      // create separate collections of rowID and columnID tokens
+      val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
+
+      // the columns are in a TD string so separate them and get unique ones
+      val columnIDs = interactions.flatMap { case (_, columns) => columns
+        val elements = columns.split(elementDelim)
+        val colIDs = elements.map( elem => elem.split(columnIdStrengthDelim)(0) )
+        colIDs
+      }.distinct().collect()
+
+      val numRows = rowIDs.size
+      val numColumns = columnIDs.size
+
+      // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
+      // broadcast them for access in distributed processes, so they are not recalculated in every task.
+      val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
+      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
+      val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
+      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+
+      val indexedInteractions =
+        interactions.map { case (rowID, columns) =>
+          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+
+          val elements = columns.split(elementDelim)
+          val row = new RandomAccessSparseVector(numColumns)
+          for (element <- elements) {
+            val id = element.split(columnIdStrengthDelim)(0)
+            val columnID = columnIDDictionary_bcast.value.get(id).get
+            val pair = element.split(columnIdStrengthDelim)
+            if (pair.size == 2)// there was a strength
+              row.setQuick(columnID,pair(1).toDouble)
+            else // no strength so set DRM value to 1.0d, this ignores 'omitScore', which is a write param
+              row.setQuick(columnID,1.0d)
+          }
+          rowIndex -> row
+        }
+        .asInstanceOf[DrmRdd[Int]]
+
+      // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
+      val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+
+      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+
+    } catch {
+      case cce: ClassCastException => {
+        println(this.getClass.toString + ": Schema has illegal values")
+        throw cce
+      }
+    }
+  }
+
+      // this creates a BiMap from an ID collection. The ID points to an ordinal int
   // which is used internal to Mahout as the row or column ID
-  // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
+  // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
+  // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
+  // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs
   private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
     var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
     for (entry <- entries) {
@@ -130,7 +222,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
 
   private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
 
-  /** Read in text delimited tuples from all URIs in this comma delimited source String.
+  /** Read in text delimited elements from all URIs in this comma delimited source String.
     *
     * @param mc context for the Spark job
     * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
@@ -145,7 +237,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
-      val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String]
+      val elementDelim = writeSchema("elementDelim").asInstanceOf[String]
       val omitScore = writeSchema("omitScore").asInstanceOf[Boolean]
       //instance vars must be put into locally scoped vals when put into closures that are
       //executed but Spark
@@ -177,11 +269,11 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
         // first get the external rowID token
         if (!vector.isEmpty){
           var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
-          // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+          // for the rest of the row, construct the vector contents of elements (external column ID, strength value)
           for (item <- vector) {
             line += columnIDDictionary.inverse.get(item.getFirst)
             if (!omitScore) line += columnIdStrengthDelim + item.getSecond
-            line += tupleDelim
+            line += elementDelim
           }
           // drop the last delimiter, not needed to end the line
           line.dropRight(1)
@@ -203,7 +295,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
 /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
   * @param mc Spark context for reading files
-  * @note The source is supplied by Reader#readTuplesFrom .
+  * @note The source is supplied by Reader#readElementsFrom .
   * */
 class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
     (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
@@ -211,7 +303,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
 /** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading files
-  * @note the destination is supplied by Writer#writeDRMTo trait method
+  * @note the destination is supplied by Writer#writeTo trait method
   * */
 class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
 
@@ -224,7 +316,7 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
     (implicit val mc: DistributedContext)
   extends TDIndexedDatasetReaderWriter
 
-/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating
+/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
   * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
   * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
   * are probably short lived in terms of lines of code so complexity may be moot.
@@ -243,18 +335,20 @@ class IndexedDatasetTextDelimitedWriteable(
     (implicit val mc: DistributedContext)
   extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
 
-  def writeTo(dest: String): Unit = {
-    writeDRMTo(this, dest)
+  override def writeTo(collection: IndexedDataset = this, dest: String): Unit = {
+    super.writeTo(this, dest)
   }
 }
 
 /**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
- * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
- * {{{
- *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source))
- * }}}
- */
+  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily
+  * used to get a secondary constructor for
+  * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a
+  * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
+  * {{{
+  *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source))
+  * }}}
+  */
 
 object IndexedDatasetTextDelimitedWriteable {
   /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */

http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index dedb279..54f33ef 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -62,7 +62,7 @@ object SparkEngine extends DistributedEngine {
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter => {
       val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) =>
-        v.nonZeroes().foreach { elem => acc(elem.index) += 1}
+        v.nonZeroes().foreach { elem => acc(elem.index) += 1 }
         acc
       }
       Iterator(acc)