You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/08 20:54:40 UTC

[1/6] git commit: (NOJIRA) style, spacing

Repository: mahout
Updated Branches:
  refs/heads/spark-1.0.x 66f164057 -> ee6359f62


(NOJIRA) style, spacing


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e5bc885f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e5bc885f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e5bc885f

Branch: refs/heads/spark-1.0.x
Commit: e5bc885fdb4492a6d54232cea6c7e8e0abd4080d
Parents: 66f1640
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Mon Aug 4 16:39:29 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Mon Aug 4 16:39:29 2014 -0700

----------------------------------------------------------------------
 .../mahout/math/scalabindings/package.scala      | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e5bc885f/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 8e0c07f..43db95e 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -17,9 +17,7 @@
 
 package org.apache.mahout.math
 
-import org.apache.mahout.math._
 import org.apache.mahout.math.solver.EigenDecomposition
-import org.apache.mahout.math.decompositions.SSVD
 
 /**
  * Mahout matrices and vectors' scala syntactic sugar
@@ -145,10 +143,10 @@ package object scalabindings {
         else
           throw new IllegalArgumentException(
             "double[][] data parameter can be the only argument for dense()")
-        case t:Array[Vector] =>
-          val m = new DenseMatrix(t.size,t.head.length)
-          t.view.zipWithIndex.foreach{
-            case(v,idx) => m(idx,::) := v
+        case t: Array[Vector] =>
+          val m = new DenseMatrix(t.size, t.head.length)
+          t.view.zipWithIndex.foreach {
+            case (v, idx) => m(idx, ::) := v
           }
           return m
         case _ => throw new IllegalArgumentException("unsupported type in the inline Matrix initializer")
@@ -160,11 +158,14 @@ package object scalabindings {
   /**
    * Default initializes are always row-wise.
    * create a sparse,
-   * e.g.
+   * e.g. {{{
+   *
    * m = sparse(
-   * (0,5)::(9,3)::Nil,
-   * (2,3.5)::(7,8)::Nil
+   *   (0,5)::(9,3)::Nil,
+   *   (2,3.5)::(7,8)::Nil
    * )
+   * 
+   * }}}
    *
    * @param rows
    * @return


[4/6] MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct

Posted by dl...@apache.org.
http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index ca92fcf..f1981bb 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -19,11 +19,30 @@ package org.apache.mahout.drivers
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
 import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.test.MahoutSuite
 
+
+
+//todo: take out, only for temp tests
+import org.apache.mahout.math._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+
 class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite  {
 
 /*
@@ -37,26 +56,27 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
 
   // correct cross-cooccurrence with LLR
   final val matrixLLRCoocBtAControl = dense(
-    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
-    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
-    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.40461878191490940),
-    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
-    (0.0,                0.0,                0.0,                0.0,                0.8181382096075936))
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
 */
 
-  final val SelfSimilairtyTSV = Set(
-    "galaxy\tnexus:1.7260924347106847",
-    "ipad\tiphone:1.7260924347106847",
-    "nexus\tgalaxy:1.7260924347106847",
-    "iphone\tipad:1.7260924347106847",
-    "surface")
 
-  final val CrossSimilarityTSV = Set("" +
-    "nexus\tnexus:0.6795961471815897,iphone:1.7260924347106847,ipad:0.6795961471815897,surface:0.6795961471815897,galaxy:1.7260924347106847",
-    "ipad\tnexus:0.6795961471815897,iphone:1.7260924347106847,ipad:0.6795961471815897,galaxy:1.7260924347106847",
-    "surface\tsurface:4.498681156950466",
-    "iphone\tnexus:0.6795961471815897,iphone:1.7260924347106847,ipad:0.6795961471815897,galaxy:1.7260924347106847",
-    "galaxy\tnexus:0.6795961471815897,iphone:1.7260924347106847,ipad:0.6795961471815897,galaxy:1.7260924347106847")
+  final val SelfSimilairtyLines = Iterable(
+      "galaxy\tnexus:1.7260924347106847",
+      "ipad\tiphone:1.7260924347106847",
+      "nexus\tgalaxy:1.7260924347106847",
+      "iphone\tipad:1.7260924347106847",
+      "surface")
+
+  val CrossIndicatorLines = Iterable(
+      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")
 
   final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
 
@@ -91,7 +111,7 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
 
   test ("ItemSimilarityDriver, non-full-spec CSV"){
 
-    val InFile = TmpDir + "in-file.csv/" //using part files, not singel file
+    val InFile = TmpDir + "in-file.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
@@ -133,13 +153,18 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--itemIDPosition", "2",
       "--rowIDPosition", "0",
       "--filterPosition", "1",
+      "--writeAllDatasets",
       "--dontAddMahoutJars"))
+    
 
     beforeEach // restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == SelfSimilairtyTSV)
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
-    assert (crossIndicatorLines == CrossSimilarityTSV)
+
+    // todo: these comparisons rely on a sort producing the same lines, which could possibly
+    // fail since the sort is on value and these can be the same for all items in a vector
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
   }
 
 
@@ -191,10 +216,12 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--dontAddMahoutJars"))
 
     beforeEach // restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == SelfSimilairtyTSV)
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
-    assert (crossIndicatorLines == CrossSimilarityTSV)
+    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
+    // some error cases
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
 
   }
 
@@ -245,10 +272,11 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--dontAddMahoutJars"))
 
     beforeEach // restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == SelfSimilairtyTSV)
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
-    assert (crossIndicatorLines == CrossSimilarityTSV)
+
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
 
   }
 
@@ -269,7 +297,7 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "3,0,1",
       "3,3,1")
 
-    val Answer = Set(
+    val Answer = Iterable(
       "0\t1:1.7260924347106847",
       "3\t2:1.7260924347106847",
       "1\t0:1.7260924347106847",
@@ -294,8 +322,10 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--dontAddMahoutJars"))
 
     beforeEach // restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == Answer)
+    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
+    // some error cases
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs Answer
 
   }
 
@@ -316,7 +346,7 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "3,0,1",
       "3,3,1")
 
-    val Answer = Set(
+    val Answer = Iterable(
       "0\t1",
       "3\t2",
       "1\t0",
@@ -342,8 +372,10 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--omitStrength"))
 
     beforeEach // restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == Answer)
+    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
+    // some error cases
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs Answer
 
   }
 
@@ -419,18 +451,326 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "--dontAddMahoutJars"))
 
     beforeEach()// restart the test context to read the output of the driver
-    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toSet[String]
-    assert(indicatorLines == SelfSimilairtyTSV)
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toSet[String]
-    assert (crossIndicatorLines == CrossSimilarityTSV)
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+
+  }
+
+  test ("ItemSimilarityDriver, two input paths"){
+
+    val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
+    val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
+    val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--dontAddMahoutJars"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+
+  }
+
+  test ("ItemSimilarityDriver, two inputs of different dimensions"){
+
+    val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
+    val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      // remove one user so A'B will be of different dimensions
+      // ItemSimilarityDriver should create one unified user dictionary and so account for this
+      // discrepancy as a blank row: "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy")
+
+    val UnequalDimensionsSelfSimilarity = Iterable(
+      "ipad\tiphone:1.7260924347106847",
+      "iphone\tipad:1.7260924347106847",
+      "nexus\tgalaxy:1.7260924347106847",
+      "galaxy\tnexus:1.7260924347106847")
+
+    val UnequalDimensionsCrossSimilarity = Iterable(
+      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
+    val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--dontAddMahoutJars"))
+
+    beforeEach // restart the test context to read the output of the driver
+
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs UnequalDimensionsSelfSimilarity
+    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarity
+
+  }
+
+  test("ItemSimilarityDriver cross similarity two separate items spaces"){
+    /* cross-similarity with category views, same user space
+            	phones	tablets	mobile_acc	soap
+          u1	0	      1	      1	          0
+          u2	1	      1	      1	          0
+          u3	0	      0	      1	          0
+          u4	1	      1	      0	          1
+    */
+    val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
+    val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,phones",
+      "u1,view,mobile_acc",
+      "u2,view,phones",
+      "u2,view,tablets",
+      "u2,view,mobile_acc",
+      "u3,view,mobile_acc",
+      "u4,view,phones",
+      "u4,view,tablets",
+      "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
+        "surface\tmobile_acc:0.6795961471815897",
+        "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
+        "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
+        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
+    val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--dontAddMahoutJars",
+      "--writeAllDatasets"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+
+  }
+
+  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
+  def tokenize(a: Iterable[String]): Iterable[String] = {
+    var r: Iterable[String] = Iterable()
+    a.foreach { l =>
+      l.split("\t").foreach{ s =>
+        r = r ++ s.split(",")
+      }
+    }
+    r.asInstanceOf[Iterable[String]]
   }
 
   override def afterAll = {
+    removeTmpDir
+    super.afterAll
+  }
+
+  def removeTmpDir = {
     // remove TmpDir
     val fs = FileSystem.get(new Configuration())
     fs.delete(new Path(TmpDir), true) // delete recursively
+  }
 
-    super.afterAll
+  test("A.t %*% B after changing row cardinality of A"){
+    // todo: move to math tests but this is Spark specific
+
+    val a = dense(
+        (1.0, 1.0))
+
+    val b = dense(
+        (1.0, 1.0),
+        (1.0, 1.0),
+        (1.0, 1.0))
+
+    val inCoreABiggertBAnswer = dense(
+        (1.0, 1.0),
+        (1.0, 1.0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    // modified to return a new CheckpointedDrm so maintains immutability but still only increases the row cardinality
+    // by returning new CheckpointedDrmSpark[K](rdd, nrow + n, ncol, _cacheStorageLevel ) Hack for now.
+    val drmABigger = drmWrap[Int](drmA.rdd, 3, 2)
+
+
+    val ABiggertB = drmABigger.t %*% drmB
+    val inCoreABiggertB = ABiggertB.collect
+
+    assert(inCoreABiggertB === inCoreABiggertBAnswer)
+
+    val bp = 0
+  }
+
+  test("ItemSimilarityDriver cross similarity two separate items spaces, missing rows in B"){
+    /* cross-similarity with category views, same user space
+            	phones	tablets	mobile_acc	soap
+            u1	0	      1	      1	          0
+            u2	1	      1	      1	          0
+removed ==> u3	0	      0	      1	          0
+            u4	1	      1	      0	          1
+    */
+    val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
+    val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,phones",
+      "u1,view,mobile_acc",
+      "u2,view,phones",
+      "u2,view,tablets",
+      "u2,view,mobile_acc",
+      //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
+      "u4,view,phones",
+      "u4,view,tablets",
+      "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847",
+        "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897",
+        "surface",
+        "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897",
+        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
+    val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile1,
+      "--input2", InFile2,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--dontAddMahoutJars",
+      "--writeAllDatasets"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
+    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
   }
 
 }


[5/6] git commit: MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct

Posted by dl...@apache.org.
MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a8097403
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a8097403
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a8097403

Branch: refs/heads/spark-1.0.x
Commit: a80974037853c5227f9e5ef1c384a1fca134746e
Parents: 00c0149
Author: pferrel <pa...@occamsmachete.com>
Authored: Wed Aug 6 16:28:37 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Wed Aug 6 16:28:37 2014 -0700

----------------------------------------------------------------------
 .../mahout/math/cf/CooccurrenceAnalysis.scala   | 220 ++++++++++
 .../apache/mahout/cf/CooccurrenceAnalysis.scala | 218 ----------
 .../apache/mahout/drivers/IndexedDataset.scala  |  25 +-
 .../mahout/drivers/ItemSimilarityDriver.scala   | 293 +++++--------
 .../apache/mahout/drivers/MahoutDriver.scala    |  28 +-
 .../mahout/drivers/MahoutOptionParser.scala     | 185 +++++++-
 .../apache/mahout/drivers/ReaderWriter.scala    |  30 +-
 .../org/apache/mahout/drivers/Schema.scala      |  54 ++-
 .../drivers/TextDelimitedReaderWriter.scala     | 107 +++--
 .../drm/CheckpointedDrmSpark.scala              |   1 -
 .../io/MahoutKryoRegistrator.scala              |   6 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   |  49 ++-
 .../drivers/ItemSimilarityDriverSuite.scala     | 422 +++++++++++++++++--
 13 files changed, 1114 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..181b729
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+    implicit val distributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
+    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+      bcastInteractionsPerItemA, crossCooccurrence = false)
+
+    var indicatorMatrices = List(drmIndicatorsAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
+
+      // Compute cross-co-occurrence matrix B'A
+      // pferrel: yikes, this is the wrong order, a big change! so you know who to blame
+      // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order
+      val drmAtB = drmA.t %*% drmB
+
+      val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing,
+        bcastInteractionsPerItemA, bcastInteractionsPerThingB)
+
+      indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of indicator matrices
+    indicatorMatrices
+  }
+
+  /**
+   * Compute loglikelihood ratio
+   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+   **/
+  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+                         numInteractionsWithAandB: Long, numInteractions: Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
+  }
+
+  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
+                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
+                        crossCooccurrence: Boolean = true) = {
+    drmBtA.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+                cooccurrences.toLong, numUsers)
+
+              val candidate = thingA -> llr
+
+              // matches legacy hadoop code and maps values to range (0..1)
+              // val tLLR = 1.0 - (1.0 / (1.0 + llr))
+              //val candidate = thingA -> tLLR
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach {
+            case (otherThing, llrScore) =>
+              llrBlock(index, otherThing) = llrScore
+          }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample users and things with an anomalous amount of interactions, inspired by
+   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
+        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each user
+        for (userIndex <- 0 until keys.size) {
+
+          val interactionsOfUser = block(userIndex, ::)
+
+          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
+
+          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
+
+          interactionsOfUser.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
+              // We ignore the original interaction value and create a binary 0-1 matrix
+              // as we only consider whether interactions happened or did not happen
+              downsampledBlock(userIndex, elem.index) = 1
+            }
+          }
+        }
+
+        keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
deleted file mode 100644
index 14cc9d5..0000000
--- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-
-/**
- * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object CooccurrenceAnalysis extends Serializable {
-
-  /** Compares (Int,Double) pairs by the second value */
-  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
-  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
-                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
-
-    implicit val distributedContext = drmARaw.context
-
-    // Apply selective downsampling, pin resulting matrix
-    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
-    // num users, which equals the maximum number of interactions per item
-    val numUsers = drmA.nrow.toInt
-
-    // Compute & broadcast the number of interactions per thing in A
-    val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
-    // Compute co-occurrence matrix A'A
-    val drmAtA = drmA.t %*% drmA
-
-    // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
-    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
-      bcastInteractionsPerItemA, crossCooccurrence = false)
-
-    var indicatorMatrices = List(drmIndicatorsAtA)
-
-    // Now look at cross-co-occurrences
-    for (drmBRaw <- drmBs) {
-      // Down-sample and pin other interaction matrix
-      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
-      // Compute & broadcast the number of interactions per thing in B
-      val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
-      // Compute cross-co-occurrence matrix B'A
-      val drmBtA = drmB.t %*% drmA
-
-      val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing,
-        bcastInteractionsPerThingB, bcastInteractionsPerItemA)
-
-      indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
-
-      drmB.uncache()
-    }
-
-    // Unpin downsampled interaction matrix
-    drmA.uncache()
-
-    // Return list of indicator matrices
-    indicatorMatrices
-  }
-
-  /**
-   * Compute loglikelihood ratio
-   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
-   **/
-  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
-                         numInteractionsWithAandB: Long, numInteractions: Long) = {
-
-    val k11 = numInteractionsWithAandB
-    val k12 = numInteractionsWithA - numInteractionsWithAandB
-    val k21 = numInteractionsWithB - numInteractionsWithAandB
-    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
-    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
-  }
-
-  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
-                        bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
-                        crossCooccurrence: Boolean = true) = {
-    drmBtA.mapBlock() {
-      case (keys, block) =>
-
-        val llrBlock = block.like()
-        val numInteractionsB: Vector = bcastNumInteractionsB
-        val numInteractionsA: Vector = bcastNumInteractionsA
-
-        for (index <- 0 until keys.size) {
-
-          val thingB = keys(index)
-
-          // PriorityQueue to select the top-k items
-          val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
-          block(index, ::).nonZeroes().foreach { elem =>
-            val thingA = elem.index
-            val cooccurrences = elem.get
-
-            // exclude co-occurrences of the item with itself
-            if (crossCooccurrence || thingB != thingA) {
-              // Compute loglikelihood ratio
-              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
-                cooccurrences.toLong, numUsers)
-
-              val candidate = thingA -> llr
-
-              // matches legacy hadoop code and maps values to range (0..1)
-              // val tLLR = 1.0 - (1.0 / (1.0 + llr))
-              //val candidate = thingA -> tLLR
-
-              // Enqueue item with score, if belonging to the top-k
-              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
-                topItemsPerThing.enqueue(candidate)
-              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
-                topItemsPerThing.dequeue()
-                topItemsPerThing.enqueue(candidate)
-              }
-            }
-          }
-
-          // Add top-k interesting items to the output matrix
-          topItemsPerThing.dequeueAll.foreach {
-            case (otherThing, llrScore) =>
-              llrBlock(index, otherThing) = llrScore
-          }
-        }
-
-        keys -> llrBlock
-    }
-  }
-
-  /**
-   * Selectively downsample users and things with an anomalous amount of interactions, inspired by
-   * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
-   *
-   * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
-   */
-  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
-    implicit val distributedContext = drmM.context
-
-    // Pin raw interaction matrix
-    val drmI = drmM.checkpoint()
-
-    // Broadcast vector containing the number of interactions with each thing
-    val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
-    val downSampledDrmI = drmI.mapBlock() {
-      case (keys, block) =>
-        val numInteractions: Vector = bcastNumInteractions
-
-        // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
-        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
-
-        val downsampledBlock = block.like()
-
-        // Downsample the interaction vector of each user
-        for (userIndex <- 0 until keys.size) {
-
-          val interactionsOfUser = block(userIndex, ::)
-
-          val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements()
-
-          val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser
-
-          interactionsOfUser.nonZeroes().foreach { elem =>
-            val numInteractionsWithThing = numInteractions(elem.index)
-            val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
-            if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) {
-              // We ignore the original interaction value and create a binary 0-1 matrix
-              // as we only consider whether interactions happened or did not happen
-              downsampledBlock(userIndex, elem.index) = 1
-            }
-          }
-        }
-
-        keys -> downsampledBlock
-    }
-
-    // Unpin raw interaction matrix
-    drmI.uncache()
-
-    downSampledDrmI
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
index 0d8c160..41622a8 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -19,6 +19,8 @@ package org.apache.mahout.drivers
 
 import com.google.common.collect.BiMap
 import org.apache.mahout.math.drm.CheckpointedDrm
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
+import org.apache.mahout.sparkbindings._
 
 /**
   * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
@@ -39,14 +41,33 @@ import org.apache.mahout.math.drm.CheckpointedDrm
   *       to be not created when not needed.
   */
 
-case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+
+  // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
+  // learn this afterwards
+
+  /**
+   * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+   * No physical changes are made to the underlying drm.
+   * @param n number to use for row carnindality, should be larger than current
+   * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
+   *       results.
+   */
+  def newRowCardinality(n: Int): IndexedDataset = {
+    assert(n > -1)
+    assert( n >= matrix.nrow)
+    val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
+    val ncol = matrix.ncol
+    val newMatrix = drmWrap[Int](drmRdd, n, ncol)
+    new IndexedDataset(newMatrix, rowIDs, columnIDs)
+  }
 }
 
 /**
   * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
   * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
   * {{{
-  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source))
+  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source))
   * }}}
   */
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 71d36c9..e0eaabc 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,7 +17,8 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.cf.CooccurrenceAnalysis
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
+import scala.collection.immutable.HashMap
 
 /**
  * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
@@ -25,7 +26,7 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
  * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
- * will be used to calculate row-wise self-similarity, or when using filters, will generate two
+ * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
  * to the secondary. Returns one or two directories of text files formatted as specified in
@@ -35,14 +36,21 @@ import org.apache.mahout.cf.CooccurrenceAnalysis
  * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
  * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
  * you can specify only the input and output file and directory--all else will default to the correct values.
- * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check
+ * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
+ * @note To use with a Spark cluster see the --master option, if you run out of heap space check
  *       the --sparkExecutorMemory option.
  */
 object ItemSimilarityDriver extends MahoutDriver {
-  //todo: Should also take two input streams and do cross similarity with no filter required.
-  // required for examples
+  // define only the options specific to ItemSimilarity
+  private final val ItemSimilarityOptions = HashMap[String, Any](
+    "maxPrefs" -> 500,
+    "maxSimilaritiesPerItem" -> 100,
+    "appName" -> "ItemSimilarityDriver")
+
+  // build options from some stardard CLI param groups
+  // Note: always put the driver specific options at the last so the can override and previous options!
+  private var options: Map[String, Any] = null
 
-  private var options: Options = _
   private var reader1: TextDelimitedIndexedDatasetReader = _
   private var reader2: TextDelimitedIndexedDatasetReader = _
   private var writer: TextDelimitedIndexedDatasetWriter = _
@@ -52,190 +60,103 @@ object ItemSimilarityDriver extends MahoutDriver {
    * @param args  Command line args, if empty a help message is printed.
    */
   override def main(args: Array[String]): Unit = {
-    val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") {
+    options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++
+      MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++
+      MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions
+
+    val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
       head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
 
       //Input output options, non-driver specific
-      note("Input, output options")
-      opt[String]('i', "input") required() action { (x, options) =>
-        options.copy(input = x)
-      } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
-      opt[String]('o', "output") required() action { (x, options) =>
-        if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally?
-          options.copy(output = x)
-        else
-          options.copy(output = x + "/")
-      } text ("Path for output, any local or HDFS supported URI (required).")
+      parseIOOptions
 
       //Algorithm control options--driver specific
       note("\nAlgorithm control options:")
-      opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-        options.copy(master = x)
-      }
-
       opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
-        options.copy(maxPrefs = x)
-      } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x =>
+        options + ("maxPrefs" -> x)
+      } text ("Max number of preferences to consider per user (optional). Default: " +
+        ItemSimilarityOptions("maxPrefs")) validate { x =>
         if (x > 0) success else failure("Option --maxPrefs must be > 0")
       }
 
 /** not implemented in CooccurrenceAnalysis.cooccurrence
       opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
-        options.copy(minPrefs = x)
+        options.put("minPrefs", x)
+        options
       } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
         if (x > 0) success else failure("Option --minPrefs must be > 0")
       }
 */
 
       opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
-        options.copy(maxSimilaritiesPerItem = x)
-      } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x =>
+        options + ("maxSimilaritiesPerItem" -> x)
+      } text ("Limit the number of similarities per item to this number (optional). Default: " +
+        ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x =>
         if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
       }
 
-      opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
-        options.copy(randomSeed = x)
-      } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x =>
-        if (x > 0) success else failure("Option --randomSeed must be > 0")
-      }
-
-      //Input text file schema--not driver specific but input data specific, tuples input,
-      // not drms
-      note("\nInput text file schema options:")
-      opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
-        options.copy(inDelim = x)
-      }
-
-      opt[String]("filter1") abbr ("f1") action { (x, options) =>
-        options.copy(filter1 = x)
-      } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
-      opt[String]("filter2") abbr ("f2") action { (x, options) =>
-        options.copy(filter2 = x)
-      } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.")
-
-      opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
-        options.copy(rowIDPosition = x)
-      } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
-        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
-      }
-
-      opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
-        options.copy(itemIDPosition = x)
-      } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
-        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
-      }
-
-      opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
-        options.copy(filterPosition = x)
-      } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
-        if (x >= -1) success else failure("Option --filterColNum must be >= -1")
-      }
-
-      note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+      //Driver notes--driver specific
+      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.")
 
-      //File finding strategy--not driver specific
-      note("\nFile discovery options:")
-      opt[Unit]('r', "recursive") action { (_, options) =>
-        options.copy(recursive = true)
-      } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+      //Input text format
+      parseInputSchemaOptions
 
-      opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
-        options.copy(filenamePattern = x)
-      } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+      //How to search for input
+      parseFileDiscoveryOptions
 
       //Drm output schema--not driver specific, drm specific
-      note("\nOutput text file schema options:")
-      opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
-        options.copy(rowKeyDelim = x)
-      } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
-      opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
-        options.copy(columnIdStrengthDelim = x)
-      } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
-      opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
-        options.copy(tupleDelim = x)
-      } text ("Separates vector tuple values in the values list (optional). Default: \",\"")
-
-      opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
-        options.copy(omitStrength = true)
-      } text ("Do not write the strength to the output files (optional), Default: false.")
-      note("This option is used to output indexable data for creating a search engine recommender.")
+      parseDrmFormatOptions
 
       //Spark config options--not driver specific
-      note("\nSpark config options:")
-      opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
-        options.copy(sparkExecutorMem = x)
-      } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g")
+      parseSparkOptions
 
-      note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"")
-
-      //Jar inclusion, this option can be set when executing the driver from compiled code
-      opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
-        options.copy(dontAddMahoutJars = true) //set the value MahoutDriver so the context will be created correctly
-      }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
-
-      //Driver notes--driver specific
-      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n")
+      //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI
+      parseGenericOptions
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
-      checkConfig { c =>
-        if (c.filterPosition == c.itemIDPosition
-            || c.filterPosition == c.rowIDPosition
-            || c.rowIDPosition == c.itemIDPosition)
-          failure("The row, item, and filter positions must be unique.") else success
-      }
-
-      //check for option consistency, probably driver specific
-      checkConfig { c =>
-        if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" +
-          " using filters they must be unique.") else success
-      }
-
     }
-
-    //repeated code, should this be put base MahoutDriver somehow?
-    parser.parse(args, Options()) map { opts =>
+    parser.parse(args, options) map { opts =>
       options = opts
       process
     }
-
   }
 
-  override def start(masterUrl: String = options.master,
-      appName: String = options.appName, dontAddMahoutJars: Boolean = options.dontAddMahoutJars):
+  override def start(masterUrl: String = options("master").asInstanceOf[String],
+      appName: String = options("appName").asInstanceOf[String],
+      dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]):
     Unit = {
 
+    // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
+    // MahoutKryoRegistrator, it should be added to the register list here so it
+    // will be only spcific to this job.
     sparkConf.set("spark.kryo.referenceTracking", "false")
       .set("spark.kryoserializer.buffer.mb", "200")
-      .set("spark.executor.memory", options.sparkExecutorMem)
+      .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
 
     super.start(masterUrl, appName, dontAddMahoutJars)
 
-    val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1,
-        "rowIDPosition" -> options.rowIDPosition,
-        "columnIDPosition" -> options.itemIDPosition,
-        "filterPosition" -> options.filterPosition)
+    val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
+        "filter" -> options("filter1").asInstanceOf[String],
+        "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int],
+        "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int],
+        "filterPosition" -> options("filterPosition").asInstanceOf[Int])
 
     reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
 
-    if (options.filterPosition != -1 && options.filter2 != null) {
-      val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2,
-          "rowIDPosition" -> options.rowIDPosition,
-          "columnIDPosition" -> options.itemIDPosition,
-          "filterPosition" -> options.filterPosition)
+    if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null)
+        || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){
+      // only need to change the filter used compared to readSchema1
+      val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String])
 
       reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
     }
 
     writeSchema = new Schema(
-        "rowKeyDelim" -> options.rowKeyDelim,
-        "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
-        "omitScore" -> options.omitStrength,
-        "tupleDelim" -> options.tupleDelim)
+        "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String],
+        "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String],
+        "omitScore" -> options("omitStrength").asInstanceOf[Boolean],
+        "tupleDelim" -> options("tupleDelim").asInstanceOf[String])
 
     writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
 
@@ -243,24 +164,60 @@ object ItemSimilarityDriver extends MahoutDriver {
 
   private def readIndexedDatasets: Array[IndexedDataset] = {
 
-    val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris
+    val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+        options("recursive").asInstanceOf[Boolean]).uris
+    val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) ""
+      else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String],
+          options("recursive").asInstanceOf[Boolean]).uris
 
     if (inFiles.isEmpty) {
       Array()
     } else {
 
-      val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles))
+      val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles))
+      if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA,
+          options("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+
+      // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+      // Here we assume there is one row ID space for all interactions. To do this we calculate the
+      // row cardinality only after reading in A and B (or potentially C...) We then adjust the
+      // cardinality so all match, which is required for the math to work.
+      // Note: this may leave blank rows with no representation in any DRM. Blank rows need to
+      // be supported (and are at least on Spark) or the row cardinality fix will not work.
+      val datasetB = if (!inFiles2.isEmpty) {
+        // get cross-cooccurrence interactions from separate files
+        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+
+        datasetB
+
+      } else if (options("filterPosition").asInstanceOf[Int] != -1
+          && options("filter2").asInstanceOf[String] != null) {
+
+        // get cross-cooccurrences interactions by using two filters on a single set of files
+        val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+
+        datasetB
 
-      if (options.filterPosition != -1 && options.filter2 != null) {
-        // todo: needs to support more than one cross-similarity indicator
-        val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles))
-        Array(selfSimilarityDataset, crossSimilarityDataset1)
       } else {
-        Array(selfSimilarityDataset)
+        null.asInstanceOf[IndexedDataset]
       }
+      if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
+        // true row cardinality is the size of the row id index, which was calculated from all rows of A and B
+        val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality
 
-    }
+        // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on
+        // its calculation
+        val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality)
+          else datasetA // this guarantees matching cardinality
 
+        val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
+          else datasetB // this guarantees matching cardinality
+
+        if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions")
+
+        Array(returnedA, returnedB)
+      } else Array(datasetA)
+    }
   }
 
   override def process: Unit = {
@@ -271,57 +228,29 @@ object ItemSimilarityDriver extends MahoutDriver {
     // todo: allow more than one cross-similarity matrix?
     val indicatorMatrices = {
       if (indexedDatasets.length > 1) {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix))
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+            options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int],
+            Array(indexedDatasets(1).matrix))
       } else {
-        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs)
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int],
+          options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int])
       }
     }
 
-    // self similarity
-    // the next two lines write the drm using a Writer class
-    // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs)
-    // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix")
-
     // an alternative is to create a version of IndexedDataset that knows how to write itself
     val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
       indexedDatasets(0).columnIDs, writeSchema)
-    selfIndicatorDataset.writeTo(options.output + "indicator-matrix")
+    selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix")
 
-    // todo: needs to support more than one cross-similarity indicator
+    // todo: would be nice to support more than one cross-similarity indicator
     if (indexedDatasets.length > 1) {
 
       val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
-      writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix")
+      writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix")
 
     }
 
     stop
   }
 
-  // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option
-  // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs
-  case class Options(
-      master: String = "local",
-      sparkExecutorMem: String = "2g",
-      appName: String = "ItemSimilarityJob",
-      randomSeed: Int = System.currentTimeMillis().toInt,
-      recursive: Boolean = false,
-      input: String = null,
-      output: String = null,
-      filenamePattern: String = "^part-.*",
-      maxSimilaritiesPerItem: Int = 100,
-      maxPrefs: Int = 500,
-      minPrefs: Int = 1,
-      rowIDPosition: Int = 0,
-      itemIDPosition: Int = 1,
-      filterPosition: Int = -1,
-      filter1: String = null,
-      filter2: String = null,
-      inDelim: String = "[,\t ]",
-      rowKeyDelim: String = "\t",
-      columnIdStrengthDelim: String = ":",
-      tupleDelim: String = ",",
-      omitStrength: Boolean = false,
-      dontAddMahoutJars: Boolean = false)
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 0c579d4..796a66a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -21,17 +21,26 @@ import org.apache.mahout.math.drm.DistributedContext
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 
+import scala.collection.immutable
+
 /** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
-  * Also define a command line parser and default options or fill in the following template:
+  * Also define a Map of options for the command line parser. The following template may help:
   * {{{
   *   object SomeDriver extends MahoutDriver {
+  *     // build options from some stardard CLI param groups
+  *     // Note: always put the driver specific options at the last so the can override and previous options!
+  *     private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++
+  *       TextDelimitedDRMOptions ++ ItemSimilarityOptions
+  *
   *     override def main(args: Array[String]): Unit = {
-  *       val parser = new MahoutOptionParser[Options]("Job Name") {
-  *         head("Job Name", "Spark")
-  *         note("Various CLI options")
-  *         //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends
+  *       val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+  *         head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+  *
+  *         //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods
+  *         parseGenericOptions
+  *         ...
   *       }
-  *       parser.parse(args, Options()) map { opts =>
+  *       parser.parse(args, options) map { opts =>
   *         options = opts
   *         process
   *       }
@@ -42,15 +51,12 @@ import org.apache.mahout.sparkbindings._
   *       //don't just stand there do something
   *       stop
   *     }
-  *
-  *     //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option
-  *     case class Options(
-  *       appName: String = "Job Name", ...
-  *     )
   *   }
   * }}}
   */
 abstract class MahoutDriver {
+
+
   implicit var mc: DistributedContext = _
   implicit val sparkConf = new SparkConf()
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 8a337f5..ba4ca1d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -17,8 +17,189 @@
 package org.apache.mahout.drivers
 
 import scopt.OptionParser
+import scala.collection.immutable
 
-/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */
-class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) {
+/** Companion object defines default option groups for reference in any driver that needs them */
+object MahoutOptionParser {
+  // set up the various default option groups
+  final val GenericOptions = immutable.HashMap[String, Any](
+    "randomSeed" -> System.currentTimeMillis().toInt,
+    "dontAddMahoutJars" -> false,
+    "writeAllDatasets" -> false)
+
+  final val SparkOptions = immutable.HashMap[String, Any](
+    "master" -> "local",
+    "sparkExecutorMem" -> "2g",
+    "appName" -> "Generic Spark App, Change this.")
+
+  final val FileIOOptions = immutable.HashMap[String, Any](
+    "recursive" -> false,
+    "input" -> null.asInstanceOf[String],
+    "input2" -> null.asInstanceOf[String],
+    "output" -> null.asInstanceOf[String],
+    "filenamePattern" -> "^part-.*")
+
+  final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any](
+    "rowIDPosition" -> 0,
+    "itemIDPosition" -> 1,
+    "filterPosition" -> -1,
+    "filter1" -> null.asInstanceOf[String],
+    "filter2" -> null.asInstanceOf[String],
+    "inDelim" -> "[,\t ]")
+
+  final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "tupleDelim" -> " ",
+    "omitStrength" -> false)
+}
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+  * keep both standarized.
+  * @param programName Name displayed in help message, the name by which the driver is invoked.
+  * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
   override def showUsageOnError = true
+
+  def parseIOOptions = {
+    note("Input, output options")
+    opt[String]('i', "input") required() action { (x, options) =>
+      options + ("input" -> x)
+    } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+    opt[String]("input2") abbr ("i2")  action { (x, options) =>
+      options + ("input2" -> x)
+    } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+
+    opt[String]('o', "output") required() action { (x, options) =>
+      // todo: check to see if HDFS allows MS-Windows backslashes locally?
+      if (x.endsWith("/")) {
+        options + ("output" -> x)
+      } else {
+        options + ("output" -> (x + "/"))
+      }
+    } text ("Path for output, any local or HDFS supported URI (required)")
+
+  }
+
+  def parseSparkOptions = {
+    note("\nSpark config options:")
+
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+      options + ("master" -> x)
+    }
+
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
+      options + ("sparkExecutorMem" -> x)
+    }
+
+  }
+
+  def parseGenericOptions = {
+    note("\nGeneral config options:")
+    opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+      options + ("randomSeed" -> x)
+    } validate { x =>
+      if (x > 0) success else failure("Option --randomSeed must be > 0")
+    }
+
+    opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
+      options + ("dontAddMahoutJars" -> true)
+    }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
+
+    //output both input DRMs
+    opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+      options + ("writeAllDatasets" -> true)
+    }//Hidden option, though a user might want this.
+  }
+
+  def parseInputSchemaOptions{
+    //Input text file schema--not driver specific but input data specific, tuples input,
+    // not drms
+    note("\nInput text file schema options:")
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+      options + ("inDelim" -> x)
+    }
+
+    opt[String]("filter1") abbr ("f1") action { (x, options) =>
+      options + ("filter1" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+    opt[String]("filter2") abbr ("f2") action { (x, options) =>
+      options + ("filter2" -> x)
+    } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+    opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+      options + ("rowIDPosition" -> x)
+    } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+      if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+    }
+
+    opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+      options + ("itemIDPosition" -> x)
+    } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+      if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+    }
+
+    opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+      options + ("filterPosition" -> x)
+    } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+      if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+    }
+
+    note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+    checkConfig { options: Map[String, Any] =>
+      if (options("filterPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int]
+        || options("filterPosition").asInstanceOf[Int] == options("rowIDPosition").asInstanceOf[Int]
+        || options("rowIDPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int])
+        failure("The row, item, and filter positions must be unique.") else success
+    }
+
+    //check for option consistency, probably driver specific
+    checkConfig { options: Map[String, Any] =>
+      if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+        && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+        failure ("If using filters they must be unique.") else success
+    }
+
+  }
+
+  def parseFileDiscoveryOptions = {
+    //File finding strategy--not driver specific
+    note("\nFile discovery options:")
+    opt[Unit]('r', "recursive") action { (_, options) =>
+      options + ("recursive" -> true)
+    } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+    opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+      options + ("filenamePattern" -> x)
+    } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+  }
+
+  def parseDrmFormatOptions = {
+    note("\nOutput text file schema options:")
+    opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+      options + ("rowKeyDelim" -> x)
+    } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+    opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+      options + ("columnIdStrengthDelim" -> x)
+    } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+    opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
+      options + ("tupleDelim" -> x)
+    } text ("Separates vector tuple values in the values list (optional). Default: \" \"")
+
+    opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+      options + ("omitStrength" -> true)
+    } text ("Do not write the strength to the output files (optional), Default: false.")
+    note("This option is used to output indexable data for creating a search engine recommender.")
+
+    note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+  }
+
 }
+
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
index c5b7385..e2bb49c 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -17,25 +17,39 @@
 
 package org.apache.mahout.drivers
 
+import com.google.common.collect.{HashBiMap, BiMap}
 import org.apache.mahout.math.drm.DistributedContext
 
-/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read.
-  * @tparam T type of object read, usually supplied by an extending trait.
-  * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created.
+/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read.
+  * @tparam T type of object read.
   */
 trait Reader[T]{
+
   val mc: DistributedContext
   val readSchema: Schema
-  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T
-  def readFrom(source: String): T = reader(mc, readSchema, source)
+
+  protected def tupleReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int]): T
+
+  def readTuplesFrom(
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+    tupleReader(mc, readSchema, source, existingRowIDs)
 }
 
 /** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
-  * @tparam T
+  * @tparam T type of object to write.
   */
 trait Writer[T]{
+
   val mc: DistributedContext
+  val sort: Boolean
   val writeSchema: Schema
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit
-  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection)
+
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+  def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index 7735b83..edff92d 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -20,21 +20,30 @@ package org.apache.mahout.drivers
 import scala.collection.mutable
 import scala.collection.mutable.HashMap
 
-/** Syntactic sugar for HashMap[String, Any]
+/** Syntactic sugar for mutable.HashMap[String, Any]
   *
   * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
   */
 class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
   // note: this require a mutable HashMap, do we care?
   this ++= params
-  if (!this.contains("omitScore")) this += ("omitScore" -> false)
+
+  /** Constructor for copying an existing Schema
+    *
+    * @param schemaToClone return a copy of this Schema
+    */
+  def this(schemaToClone: Schema){
+    this()
+    this ++= schemaToClone
+  }
 }
 
-// These can be used to keep the text in and out fairly standard to Mahout, where an application specific format is not
-// required.
+// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+// format is not required.
 
 /** Simple default Schema for typical text delimited tuple file input
-  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID
+  * <comma, tab, or space>here may be other ignored text...)
   */
 class DefaultTupleReadSchema extends Schema(
     "delim" -> "[,\t ]", //comma, tab or space
@@ -43,44 +52,47 @@ class DefaultTupleReadSchema extends Schema(
     "columnIDPosition" -> 1,
     "filterPosition" -> -1)
 
-/** Simple default Schema for typical text delimited drm file output
-  * This tells the writer to write a DRM of the default
-  * (rowID<tab>columnID1:score1,columnID2:score2,...)
+/** Default Schema for text delimited drm file output
+  * This tells the writer to write a DRM of the default form:
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...)
   */
 class DefaultDRMWriteSchema extends Schema(
     "rowKeyDelim" -> "\t",
     "columnIdStrengthDelim" -> ":",
-    "tupleDelim" -> ",")
+    "tupleDelim" -> " ",
+    "omitScore" -> false)
 
-/** Simple default Schema for typical text delimited drm file output
-  * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...)
+/** Default Schema for typical text delimited drm file input
+  * This tells the reader to input text lines of the form:
+  * (rowID<tab>columnID1:score1,columnID2:score2,...)
   */
 class DefaultDRMReadSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",")
+  "tupleDelim" -> " ")
 
-/** Simple default Schema for reading a text delimited drm file  where the score of any tuple is ignored,
+/** Default Schema for reading a text delimited drm file  where the score of any tuple is ignored,
   * all non-zeros are replaced with 1.
   * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is ignored. Alternatively the format can be
-  * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. This is the default output format for
-  * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
+  * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+  * Alternatively the format can be
+  * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+  * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
   */
 class DRMReadBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",",
+  "tupleDelim" -> " ",
   "omitScore" -> true)
 
-/** Simple default Schema for typical text delimited drm file write where the score of a tuple is omitted.
+/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted.
   * The presence of a tuple means the score = 1, the absence means a score of 0.
-  * This tells the reader to input DRM lines of the form
-  * (rowID<tab>columnID1,columnID2,...)
+  * This tells the writer to output DRM lines of the form
+  * (rowID<tab>columnID1<space>columnID2...)
   */
 class DRMWriteBooleanSchema extends Schema(
   "rowKeyDelim" -> "\t",
   "columnIdStrengthDelim" -> ":",
-  "tupleDelim" -> ",",
+  "tupleDelim" -> " ",
   "omitScore" -> true)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index ae78d59..11d647b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,14 +17,12 @@
 
 package org.apache.mahout.drivers
 
-import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
 import com.google.common.collect.{BiMap, HashBiMap}
-import scala.collection.JavaConversions._
 import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
-
+import scala.collection.JavaConversions._
 
 /** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
   */
@@ -36,7 +34,11 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
     * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
     * @return
     */
-  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = {
+  protected def tupleReader(
+      mc: DistributedContext,
+      readSchema: Schema,
+      source: String,
+      existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
     try {
       val delimiter = readSchema("delim").asInstanceOf[String]
       val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
@@ -51,6 +53,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       })
 
       var columns = mc.textFile(source).map { line => line.split(delimiter) }
+      //val m = columns.collect
 
       // -1 means no filter in the input text, take them all
       if(filterPosition != -1) {
@@ -59,7 +62,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
       }
 
       // get row and column IDs
-      val m = columns.collect
+      //val m = columns.collect
       val interactions = columns.map { tokens =>
         tokens(rowIDPosition) -> tokens(columnIDPosition)
       }
@@ -75,10 +78,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
 
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
       // broadcast them for access in distributed processes, so they are not recalculated in every task.
-      val rowIDDictionary = asOrderedDictionary(rowIDs)
+      val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
       val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
 
-      val columnIDDictionary = asOrderedDictionary(columnIDs)
+      val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
       val indexedInteractions =
@@ -113,11 +116,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
   // this creates a BiMap from an ID collection. The ID points to an ordinal int
   // which is used internal to Mahout as the row or column ID
   // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
-  private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = {
-    var dictionary: BiMap[String, Int] = HashBiMap.create()
-    var index = 0
+  private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+    var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
     for (entry <- entries) {
-      dictionary.forcePut(entry, index)
+      if (!dictionary.contains(entry)) dictionary.put(entry, index)
       index += 1
     }
     dictionary
@@ -125,13 +127,21 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
 }
 
 trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
+
   /** Read in text delimited tuples from all URIs in this comma delimited source String.
     *
     * @param mc context for the Spark job
     * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
     * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
     */
-  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = {
+  protected def writer(
+      mc: DistributedContext,
+      writeSchema: Schema,
+      dest: String,
+      indexedDataset: IndexedDataset,
+      sort: Boolean = true): Unit = {
     try {
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -140,8 +150,14 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
       //instance vars must be put into locally scoped vals when put into closures that are
       //executed but Spark
 
-      assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException })
-      assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException})
+      assert(indexedDataset != null, {
+        println(this.getClass.toString + ": has no indexedDataset to write")
+        throw new IllegalArgumentException
+      })
+      assert(!dest.isEmpty, {
+        println(this.getClass.toString + ": has no destination or indextedDataset to write")
+        throw new IllegalArgumentException
+      })
 
       val matrix = indexedDataset.matrix
       val rowIDDictionary = indexedDataset.rowIDs
@@ -149,18 +165,29 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
 
       matrix.rdd.map { case (rowID, itemVector) =>
 
-        // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens
-        // first get the external rowID token
-        var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
-
-        // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
-        for (item <- itemVector.nonZeroes()) {
-          line += columnIDDictionary.inverse.get(item.index)
-          if (!omitScore) line += columnIdStrengthDelim + item.get
-          line += tupleDelim
+        // turn non-zeros into list for sorting
+        val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]]
+        for (ve <- itemVector.nonZeroes) {
+          val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
+          itemList += item
         }
-        // drop the last delimiter, not needed to end the line
-        line.dropRight(1)
+        //sort by highest value descending(-)
+        val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+
+        // first get the external rowID token
+        if (!vector.isEmpty){
+          var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+          // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+          for (item <- vector) {
+            line += columnIDDictionary.inverse.get(item.getFirst)
+            if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+            line += tupleDelim
+          }
+          // drop the last delimiter, not needed to end the line
+          line.dropRight(1)
+        } else {//no items so write a line with id but no values, no delimiters
+          rowIDDictionary.inverse.get(rowID)
+        } // "if" returns a line of text so this must be last in the block
       }
       .saveAsTextFile(dest)
 
@@ -176,25 +203,28 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
 /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
   * @param mc Spark context for reading files
-  * @note The source is supplied by Reader#readFrom .
+  * @note The source is supplied by Reader#readTuplesFrom .
   * */
-class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
+    (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
 
 /** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading files
-  * @note the destination is supplied by Writer#writeTo trait method
+  * @note the destination is supplied by Writer#writeDRMTo trait method
   * */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
 
 /** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
   * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
   * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
   * @param mc Spark context for reading the files, may be implicitly defined.
   * */
-class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true)
+    (implicit val mc: DistributedContext)
+  extends TDIndexedDatasetReaderWriter
 
-/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
+/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating
   * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
   * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
   * are probably short lived in terms of lines of code so complexity may be moot.
@@ -204,12 +234,17 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
   * @param writeSchema contains params for the schema/format or the written text delimited file.
   * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
   * */
-class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int],
-                                           val writeSchema: Schema)(implicit val mc: DistributedContext)
+class IndexedDatasetTextDelimitedWriteable(
+    matrix: CheckpointedDrm[Int],
+    rowIDs: BiMap[String,Int],
+    columnIDs: BiMap[String,Int],
+    val writeSchema: Schema,
+    val sort: Boolean = true)
+    (implicit val mc: DistributedContext)
   extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
 
   def writeTo(dest: String): Unit = {
-    writeTo(this, dest)
+    writeDRMTo(this, dest)
   }
 }
 
@@ -217,11 +252,11 @@ class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs:
  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
  * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
  * {{{
- *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source))
+ *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source))
  * }}}
  */
 
 object IndexedDatasetTextDelimitedWriteable {
   /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
-  def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc)
+  def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 1c5546b..cc5ebf2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -65,7 +65,6 @@ class CheckpointedDrmSpark[K: ClassTag](
   private var cached: Boolean = false
   override val context: DistributedContext = rdd.context
 
-
   /**
    * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 22e31cc..61f37e4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -23,6 +23,10 @@ import com.google.common.collect.HashBiMap
 import org.apache.mahout.math._
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.mahout.sparkbindings._
+import org.apache.mahout.common.Pair
+import org.apache.mahout.math.Vector.Element
+
+import scala.collection.immutable.List
 
 /** Kryo serialization registrator for Mahout */
 class MahoutKryoRegistrator extends KryoRegistrator {
@@ -32,6 +36,6 @@ class MahoutKryoRegistrator extends KryoRegistrator {
     kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable])
-    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer());
+    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer())
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 938dc33..642e90a 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.cf
 
+import org.apache.mahout.math.cf.CooccurrenceAnalysis
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings.{MatrixOps, _}
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
@@ -48,13 +49,19 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     (0.0,                0.0,                0.0,                     0.0,                0.0))
 
   // correct cross-cooccurrence with LLR
-  final val matrixLLRCoocBtAControl = dense(
+  final val m = dense(
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
     (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
     (0.0,                0.0,                0.0,                0.0,                4.498681156950466))
 
+  final val matrixLLRCoocBtAControl = dense(
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+      (0.0,                0.0,                0.6795961471815897, 0.0,                4.498681156950466))
 
 
   test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
@@ -150,6 +157,46 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut
     n should be < 1E-10
   }
 
+  test("cooccurrence two matrices with different number of columns"){
+    val a = dense(
+      (1, 1, 0, 0, 0),
+      (0, 0, 1, 1, 0),
+      (0, 0, 0, 0, 1),
+      (1, 0, 0, 1, 0))
+
+    val b = dense(
+      (0, 1, 1, 0),
+      (1, 1, 1, 0),
+      (0, 0, 1, 0),
+      (1, 1, 0, 1))
+
+    val matrixLLRCoocBtANonSymmetric = dense(
+      (0.0,                1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.6795961471815897, 0.6795961471815897, 0.0),
+      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+      (5.545177444479561,  1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+      (0.0,                0.0,                0.6795961471815897, 0.0))
+
+    val drmA = drmParallelize(m = a, numPartitions = 2)
+    val drmB = drmParallelize(m = b, numPartitions = 2)
+
+    //self similarity
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val matrixSelfCooc = drmCooc(0).checkpoint().collect
+    val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+    var n = (new MatrixOps(m = diffMatrix)).norm
+    n should be < 1E-10
+
+    //cross similarity
+    val matrixCrossCooc = drmCooc(1).checkpoint().collect
+    val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+    n = (new MatrixOps(m = diff2Matrix)).norm
+
+    //cooccurrence without LLR is just a A'B
+    //val inCoreAtB = a.transpose().times(b)
+    //val bp = 0
+  }
+
   test("LLR calc") {
     val A = dense(
         (1, 1, 0, 0, 0),


[2/6] git commit: MAHOUT-1597: A + 1.0 (fixes)

Posted by dl...@apache.org.
MAHOUT-1597: A + 1.0 (fixes)


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/7a50a291
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/7a50a291
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/7a50a291

Branch: refs/heads/spark-1.0.x
Commit: 7a50a291b4598e9809f9acf609b92175ce7f953b
Parents: e5bc885
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Wed Aug 6 12:30:51 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Wed Aug 6 12:33:03 2014 -0700

----------------------------------------------------------------------
 .../org/apache/mahout/math/drm/logical/OpAewScalar.scala     | 6 +++++-
 .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala      | 2 +-
 .../org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala   | 8 ++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
index 3b651f6..19a910c 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.math.drm.logical
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
 
 /** Operator denoting expressions like 5.0 - A or A * 5.6 */
 case class OpAewScalar[K: ClassTag](
@@ -27,7 +28,10 @@ case class OpAewScalar[K: ClassTag](
     val op: String
     ) extends AbstractUnaryOp[K,K] {
 
-  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.canHaveMissingRows)
+      Random.nextLong()
+    else A.partitioningTag
 
   /** Stuff like `A +1` is always supposed to fix this */
   override protected[mahout] lazy val canHaveMissingRows: Boolean = false

http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 03050bb..1c5546b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -180,7 +180,7 @@ class CheckpointedDrmSpark[K: ClassTag](
       val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
       val rowCount = rdd.count()
       _canHaveMissingRows = maxPlus1 != rowCount ||
-        rdd.map(_._1).sum().toLong != ((rowCount -1.0 ) * (rowCount -2.0) /2.0).toLong
+          rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong
       intFixExtra = (maxPlus1 - rowCount) max 0L
       maxPlus1
     } else

http://git-wip-us.apache.org/repos/asf/mahout/blob/7a50a291/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index c47f7f1..a5cb7f8 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -28,6 +28,14 @@ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 
 /** DRMLike tests -- just run common DRM tests in Spark. */
 class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuiteBase {
+
+  test("drmParallellize produces drm with no missing rows") {
+    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    drmA.canHaveMissingRows shouldBe false
+  }
+
   test("DRM blockify dense") {
 
     val inCoreA = dense((1, 2, 3), (3, 4, 5))


[6/6] git commit: MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel) this closes apache/mahout#40

Posted by dl...@apache.org.
MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel)
this closes apache/mahout#40

Squashed commit of the following:

commit 2e362dad82aef764bef163a64eb2bfc1b836a07e
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 15:29:51 2014 -0700

    no need to delete tmp afterAll since doing it afterEach

commit 29f73e514570073486b6d8084f4d8a958765fc95
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 15:24:12 2014 -0700

    had to fall back to tokenized data comparison in tests since the order of values cannot be relied upon

commit ec42fe7a1ba6b26aa39bb99e977af7ebb15d9523
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 13:14:26 2014 -0700

    added a check in driver to see if the context should be closed, not when running a test

commit 2e24ddc7dd612a91fd36312973b05d715def800d
Merge: a809740 1d07cf0
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 12:19:16 2014 -0700

    changing tests for drivers to reuse the test context instead of creating new ones

commit 1d07cf0d1a4409bab9d29e824543c0dae7c0d903
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 15:07:58 2014 -0700

    license

commit f1a31a77f5a8b4d4961a871a88e1a4f5194df90b
Merge: d64146f 00c0149
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 13:29:38 2014 -0700

    Merge branch 'master' into spark-1.0.x

commit d64146ff12be5ffad1832075539397be94c999a3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:58:07 2014 -0700

    "deleteOnExit" doesn't work in tests, rolling back. needs code added to test base for scala test that handles temporary directories, similarly to how it's been done for junit.

commit 69c393a373a6a80b1542a745cca685b2709696b6
Merge: 439e878 7a50a29
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:39:28 2014 -0700

    Merge branch 'master' into spark-1.0.x

commit 439e87850d552938d2d0e2c68507c200cabd8d1c
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:31:31 2014 -0700

    Temporarily disable ItemSimilarityDriverSuite as failing under Spark 1.0.x for yet unknown reason

commit af099a2d53f673ce37a1d483a3424b78e6b9cb9c
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:30:51 2014 -0700

    MAHOUT-1597: A + 1.0 (fixes)

commit 194b77438f532cc7291f382710aa13d97c07a249
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 11:31:56 2014 -0700

    Single Blas suite to speed up, share session

commit 26a5824ca9bf64430f50f3c524ca14a1c68a04b5
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 10:58:48 2014 -0700

    Shared context, local[3], bumping up Xmx to 768m to run tests

commit e31e9b97724600f806af7e7f861ebdc7943e54bc
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Aug 4 17:28:35 2014 -0700

    bumping scala to 10.4

commit 13e909b58eaa89e212415318655dbe82ef982323
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Aug 4 15:00:59 2014 -0700

    Initial migration.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ee6359f6
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ee6359f6
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ee6359f6

Branch: refs/heads/spark-1.0.x
Commit: ee6359f621b508ab7f21df0316941e68c75eb3e5
Parents: a809740
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Fri Aug 8 11:52:10 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Fri Aug 8 11:52:10 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../mahout/test/LoggerConfiguration.scala       |   8 +-
 pom.xml                                         |   6 +-
 .../mahout/drivers/ItemSimilarityDriver.scala   |   5 +-
 .../apache/mahout/drivers/MahoutDriver.scala    |  25 +-
 .../mahout/drivers/MahoutOptionParser.scala     |   5 -
 .../mahout/sparkbindings/SparkEngine.scala      |   2 +-
 .../drivers/ItemSimilarityDriverSuite.scala     | 745 +++++++++----------
 .../mahout/sparkbindings/blas/ABtSuite.scala    |  54 --
 .../mahout/sparkbindings/blas/AewBSuite.scala   | 101 ---
 .../mahout/sparkbindings/blas/AtASuite.scala    |  48 --
 .../mahout/sparkbindings/blas/AtSuite.scala     |  44 --
 .../mahout/sparkbindings/blas/BlasSuite.scala   | 154 ++++
 .../test/DistributedSparkSuite.scala            |  25 +-
 .../test/LoggerConfiguration.scala              |   8 +-
 15 files changed, 557 insertions(+), 675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 538b12b..aefb838 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel)
+
   MAHOUT-1596: implement rbind() operator (Anand Avati and dlyubimov)
 
   MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
index 95b92b7..7a34aa2 100644
--- a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
@@ -1,13 +1,13 @@
 package org.apache.mahout.test
 
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, BeforeAndAfter, Suite}
+import org.scalatest._
 import org.apache.log4j.{Level, Logger, BasicConfigurator}
 
-trait LoggerConfiguration extends BeforeAndAfterAll {
+trait LoggerConfiguration extends BeforeAndAfterAllConfigMap {
   this: Suite =>
 
-  override protected def beforeAll(): Unit = {
-    super.beforeAll()
+  override protected def beforeAll(configMap: ConfigMap): Unit = {
+    super.beforeAll(configMap)
     BasicConfigurator.resetConfiguration()
     BasicConfigurator.configure()
     Logger.getRootLogger.setLevel(Level.ERROR)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aad4c9c..ef9ae03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,8 +108,8 @@
     <lucene.version>4.6.1</lucene.version>
     <slf4j.version>1.7.5</slf4j.version>
     <scala.major>2.10</scala.major>
-    <scala.version>2.10.3</scala.version>
-    <spark.version>0.9.1</spark.version>
+    <scala.version>2.10.4</scala.version>
+    <spark.version>1.0.1</spark.version>
   </properties>
   <issueManagement>
     <system>Jira</system>
@@ -557,7 +557,7 @@
         <configuration>
           <forkCount>2</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx512m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin
+          <argLine>-Xmx768m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin
           -Djava.security.policy=${project.build.directory}/../../buildtools/src/test/resources/java.policy</argLine>
           <argLine>-Djava.security.auth.login.config=${project.build.directory}/../../buildtools/src/test/resources/jaas.config</argLine>
           <testFailureIgnore>false</testFailureIgnore>

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index e0eaabc..460106f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -123,8 +123,7 @@ object ItemSimilarityDriver extends MahoutDriver {
   }
 
   override def start(masterUrl: String = options("master").asInstanceOf[String],
-      appName: String = options("appName").asInstanceOf[String],
-      dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]):
+      appName: String = options("appName").asInstanceOf[String]):
     Unit = {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
@@ -134,7 +133,7 @@ object ItemSimilarityDriver extends MahoutDriver {
       .set("spark.kryoserializer.buffer.mb", "200")
       .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
 
-    super.start(masterUrl, appName, dontAddMahoutJars)
+    super.start(masterUrl, appName)
 
     val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
         "filter" -> options("filter1").asInstanceOf[String],

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 796a66a..e92ed37 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -58,32 +58,35 @@ abstract class MahoutDriver {
 
 
   implicit var mc: DistributedContext = _
-  implicit val sparkConf = new SparkConf()
+  implicit var sparkConf = new SparkConf()
+  var _useExistingContext: Boolean = false
 
   /** Creates a Spark context to run the job inside.
     * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
     * these must be set before the context is created.
     * @param masterUrl Spark master URL
     * @param appName  Name to display in Spark UI
-    * @param customJars List of paths to custom jars
     * */
-  protected def start(masterUrl: String, appName: String, customJars:Traversable[String]) : Unit = {
-    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
-  }
-
-  protected def start(masterUrl: String, appName: String, dontAddMahoutJars: Boolean = false) : Unit = {
-    val customJars = Traversable.empty[String]
-    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf, !dontAddMahoutJars)
+  protected def start(masterUrl: String, appName: String) : Unit = {
+    if (!_useExistingContext) {
+      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+    }
   }
 
   /** Override (optionally) for special cleanup */
   protected def stop: Unit = {
-    mc.close
+    if (!_useExistingContext) mc.close
   }
 
-  /** This is wher you do the work, call start first, then before exiting call stop */
+  /** This is where you do the work, call start first, then before exiting call stop */
   protected def process: Unit
 
   /** Parse command line and call process */
   def main(args: Array[String]): Unit
+
+  def useContext(context: DistributedContext): Unit = {
+    _useExistingContext = true
+    mc = context
+    sparkConf = mc.getConf
+  }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index ba4ca1d..3aada78 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -24,7 +24,6 @@ object MahoutOptionParser {
   // set up the various default option groups
   final val GenericOptions = immutable.HashMap[String, Any](
     "randomSeed" -> System.currentTimeMillis().toInt,
-    "dontAddMahoutJars" -> false,
     "writeAllDatasets" -> false)
 
   final val SparkOptions = immutable.HashMap[String, Any](
@@ -102,10 +101,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       if (x > 0) success else failure("Option --randomSeed must be > 0")
     }
 
-    opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
-      options + ("dontAddMahoutJars" -> true)
-    }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
-
     //output both input DRMs
     opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
       options + ("writeAllDatasets" -> true)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 36223fc..dedb279 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -127,7 +127,7 @@ object SparkEngine extends DistributedEngine {
    */
   def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
 
-    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minSplits = parMin)
+    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
         // Get rid of VectorWritable
         .map(t => (t._1, t._2.get()))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index f1981bb..cfabfdb 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -19,42 +19,30 @@ package org.apache.mahout.drivers
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.scalatest.FunSuite
+import org.scalatest.{ConfigMap, FunSuite}
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.test.MahoutSuite
-
-
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
 
 //todo: take out, only for temp tests
-import org.apache.mahout.math._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
 import RLikeDrmOps._
 import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
 
 
-
-class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite  {
+class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
 /*
-  // correct self-cooccurrence with LLR
   final val matrixLLRCoocAtAControl = dense(
-    (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
-    (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
-    (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.0,                0.0))
+      (0.0,                1.7260924347106847, 0.0,                     0.0,                0.0),
+      (1.7260924347106847, 0.0,                0.0,                     0.0,                0.0),
+      (0.0,                0.0,                0.0,                     1.7260924347106847, 0.0),
+      (0.0,                0.0,                1.7260924347106847,      0.0,                0.0),
+      (0.0,                0.0,                0.0,                     0.0,                0.0))
 
-  // correct cross-cooccurrence with LLR
   final val matrixLLRCoocBtAControl = dense(
       (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
       (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
@@ -78,20 +66,35 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
       "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")
 
+  // todo: a better test would be to sort each vector by itemID and compare rows, tokens misses some error cases
+  final val SelfSimilairtyTokens = tokenize(Iterable(
+      "galaxy\tnexus:1.7260924347106847",
+      "ipad\tiphone:1.7260924347106847",
+      "nexus\tgalaxy:1.7260924347106847",
+      "iphone\tipad:1.7260924347106847",
+      "surface"))
+
+  val CrossIndicatorTokens = tokenize(Iterable(
+      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
+
   final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
 
   /*
     //Clustered Spark and HDFS, not a good everyday build test
     ItemSimilarityDriver.main(Array(
-      "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
-      "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
-      "--master", "spark://occam4:7077",
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1"
+        "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+        "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+        "--master", "spark://occam4:7077",
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"
     ))
 */
   // local multi-threaded Spark with HDFS using large dataset
@@ -109,7 +112,8 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     ))
   */
 
-  test ("ItemSimilarityDriver, non-full-spec CSV"){
+  // TODO: failing, temporarily disabled
+  test("ItemSimilarityDriver, non-full-spec CSV") {
 
     val InFile = TmpDir + "in-file.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
@@ -140,147 +144,133 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--writeAllDatasets",
-      "--dontAddMahoutJars"))
-    
-
-    beforeEach // restart the test context to read the output of the driver
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
     // todo: these comparisons rely on a sort producing the same lines, which could possibly
     // fail since the sort is on value and these can be the same for all items in a vector
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
   }
 
 
 
-  test ("ItemSimilarityDriver TSV "){
+  test("ItemSimilarityDriver TSV ") {
 
     val InFile = TmpDir + "in-file.tsv/"
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1\tpurchase\tiphone",
-      "u1\tpurchase\tipad",
-      "u2\tpurchase\tnexus",
-      "u2\tpurchase\tgalaxy",
-      "u3\tpurchase\tsurface",
-      "u4\tpurchase\tiphone",
-      "u4\tpurchase\tgalaxy",
-      "u1\tview\tiphone",
-      "u1\tview\tipad",
-      "u1\tview\tnexus",
-      "u1\tview\tgalaxy",
-      "u2\tview\tiphone",
-      "u2\tview\tipad",
-      "u2\tview\tnexus",
-      "u2\tview\tgalaxy",
-      "u3\tview\tsurface",
-      "u3\tview\tnexus",
-      "u4\tview\tiphone",
-      "u4\tview\tipad",
-      "u4\tview\tgalaxy")
+        "u1\tpurchase\tiphone",
+        "u1\tpurchase\tipad",
+        "u2\tpurchase\tnexus",
+        "u2\tpurchase\tgalaxy",
+        "u3\tpurchase\tsurface",
+        "u4\tpurchase\tiphone",
+        "u4\tpurchase\tgalaxy",
+        "u1\tview\tiphone",
+        "u1\tview\tipad",
+        "u1\tview\tnexus",
+        "u1\tview\tgalaxy",
+        "u2\tview\tiphone",
+        "u2\tview\tipad",
+        "u2\tview\tnexus",
+        "u2\tview\tgalaxy",
+        "u3\tview\tsurface",
+        "u3\tview\tnexus",
+        "u4\tview\tiphone",
+        "u4\tview\tipad",
+        "u4\tview\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "[,\t]",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "[,\t]",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
-    beforeEach // restart the test context to read the output of the driver
     // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
     // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver log-ish files"){
+  test("ItemSimilarityDriver log-ish files") {
 
     val InFile = TmpDir + "in-file.log/"
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
-      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
-      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
+        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
+        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
+        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "\t",
-      "--itemIDPosition", "4",
-      "--rowIDPosition", "1",
-      "--filterPosition", "2",
-      "--dontAddMahoutJars"))
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "\t",
+        "--itemIDPosition", "4",
+        "--rowIDPosition", "1",
+        "--filterPosition", "2"))
 
-    beforeEach // restart the test context to read the output of the driver
 
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver legacy supported file format"){
+  test("ItemSimilarityDriver legacy supported file format") {
 
     val InDir = TmpDir + "in-dir/"
     val InFilename = "in-file.tsv"
@@ -289,20 +279,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-      "0,0,1",
-      "0,1,1",
-      "1,2,1",
-      "1,3,1",
-      "2,4,1",
-      "3,0,1",
-      "3,3,1")
-
-    val Answer = Iterable(
-      "0\t1:1.7260924347106847",
-      "3\t2:1.7260924347106847",
-      "1\t0:1.7260924347106847",
-      "4",
-      "2\t3:1.7260924347106847")
+        "0,0,1",
+        "0,1,1",
+        "1,2,1",
+        "1,3,1",
+        "2,4,1",
+        "3,0,1",
+        "3,3,1")
+
+    val Answer = tokenize(Iterable(
+        "0\t1:1.7260924347106847",
+        "3\t2:1.7260924347106847",
+        "1\t0:1.7260924347106847",
+        "4",
+        "2\t3:1.7260924347106847"))
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
@@ -312,24 +302,18 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     //rename part-00000 to something.tsv
     fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InPath,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--dontAddMahoutJars"))
+        "--input", InPath,
+        "--output", OutPath,
+        "--master", masterUrl))
 
-    beforeEach // restart the test context to read the output of the driver
-    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
-    // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs Answer
+    tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
 
-  test ("ItemSimilarityDriver write search engine output"){
+  test("ItemSimilarityDriver write search engine output") {
 
     val InDir = TmpDir + "in-dir/"
     val InFilename = "in-file.tsv"
@@ -338,20 +322,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-      "0,0,1",
-      "0,1,1",
-      "1,2,1",
-      "1,3,1",
-      "2,4,1",
-      "3,0,1",
-      "3,3,1")
-
-    val Answer = Iterable(
-      "0\t1",
-      "3\t2",
-      "1\t0",
-      "4",
-      "2\t3")
+        "0,0,1",
+        "0,1,1",
+        "1,2,1",
+        "1,3,1",
+        "2,4,1",
+        "3,0,1",
+        "3,3,1")
+
+    val Answer = tokenize(Iterable(
+        "0\t1",
+        "3\t2",
+        "1\t0",
+        "4",
+        "2\t3"))
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
@@ -361,51 +345,45 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     //rename part-00000 to something.tsv
     fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InPath,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--dontAddMahoutJars",
-      "--omitStrength"))
+        "--input", InPath,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--omitStrength"))
 
-    beforeEach // restart the test context to read the output of the driver
-    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
-    // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs Answer
+    tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
 
-  test("ItemSimilarityDriver recursive file discovery using filename patterns"){
+  test("ItemSimilarityDriver recursive file discovery using filename patterns") {
     //directory structure using the following
     // tmp/data/m1.tsv
     // tmp/data/more-data/another-dir/m2.tsv
     val M1Lines = Array(
-      "u1\tpurchase\tiphone",
-      "u1\tpurchase\tipad",
-      "u2\tpurchase\tnexus",
-      "u2\tpurchase\tgalaxy",
-      "u3\tpurchase\tsurface",
-      "u4\tpurchase\tiphone",
-      "u4\tpurchase\tgalaxy",
-      "u1\tview\tiphone")
+        "u1\tpurchase\tiphone",
+        "u1\tpurchase\tipad",
+        "u2\tpurchase\tnexus",
+        "u2\tpurchase\tgalaxy",
+        "u3\tpurchase\tsurface",
+        "u4\tpurchase\tiphone",
+        "u4\tpurchase\tgalaxy",
+        "u1\tview\tiphone")
 
     val M2Lines = Array(
-      "u1\tview\tipad",
-      "u1\tview\tnexus",
-      "u1\tview\tgalaxy",
-      "u2\tview\tiphone",
-      "u2\tview\tipad",
-      "u2\tview\tnexus",
-      "u2\tview\tgalaxy",
-      "u3\tview\tsurface",
-      "u3\tview\tnexus",
-      "u4\tview\tiphone",
-      "u4\tview\tipad",
-      "u4\tview\tgalaxy")
+        "u1\tview\tipad",
+        "u1\tview\tnexus",
+        "u1\tview\tgalaxy",
+        "u2\tview\tiphone",
+        "u2\tview\tipad",
+        "u2\tview\tnexus",
+        "u2\tview\tgalaxy",
+        "u3\tview\tsurface",
+        "u3\tview\tnexus",
+        "u4\tview\tiphone",
+        "u4\tview\tipad",
+        "u4\tview\tgalaxy")
 
     val InFilenameM1 = "m1.tsv"
     val InDirM1 = TmpDir + "data/"
@@ -434,27 +412,23 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
 
     // local multi-threaded Spark with default FS, suitable for build tests but need better location for data
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     ItemSimilarityDriver.main(Array(
-      "--input", InPathStart,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "\t",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--filenamePattern", "m..tsv",
-      "--recursive",
-      "--dontAddMahoutJars"))
+        "--input", InPathStart,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "\t",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--filenamePattern", "m..tsv",
+        "--recursive"))
 
-    beforeEach()// restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
@@ -465,53 +439,49 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,iphone",
-      "u1,view,ipad",
-      "u1,view,nexus",
-      "u1,view,galaxy",
-      "u2,view,iphone",
-      "u2,view,ipad",
-      "u2,view,nexus",
-      "u2,view,galaxy",
-      "u3,view,surface",
-      "u3,view,nexus",
-      "u4,view,iphone",
-      "u4,view,ipad",
-      "u4,view,galaxy")
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,iphone",
+        "u1,view,ipad",
+        "u1,view,nexus",
+        "u1,view,galaxy",
+        "u2,view,iphone",
+        "u2,view,ipad",
+        "u2,view,nexus",
+        "u2,view,galaxy",
+        "u3,view,surface",
+        "u3,view,nexus",
+        "u4,view,iphone",
+        "u4,view,ipad",
+        "u4,view,galaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
@@ -522,68 +492,62 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      // remove one user so A'B will be of different dimensions
-      // ItemSimilarityDriver should create one unified user dictionary and so account for this
-      // discrepancy as a blank row: "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,iphone",
-      "u1,view,ipad",
-      "u1,view,nexus",
-      "u1,view,galaxy",
-      "u2,view,iphone",
-      "u2,view,ipad",
-      "u2,view,nexus",
-      "u2,view,galaxy",
-      "u3,view,surface",
-      "u3,view,nexus",
-      "u4,view,iphone",
-      "u4,view,ipad",
-      "u4,view,galaxy")
-
-    val UnequalDimensionsSelfSimilarity = Iterable(
-      "ipad\tiphone:1.7260924347106847",
-      "iphone\tipad:1.7260924347106847",
-      "nexus\tgalaxy:1.7260924347106847",
-      "galaxy\tnexus:1.7260924347106847")
-
-    val UnequalDimensionsCrossSimilarity = Iterable(
-      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847")
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        // remove one user so A'B will be of different dimensions
+        // ItemSimilarityDriver should create one unified user dictionary and so account for this
+        // discrepancy as a blank row: "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,iphone",
+        "u1,view,ipad",
+        "u1,view,nexus",
+        "u1,view,galaxy",
+        "u2,view,iphone",
+        "u2,view,ipad",
+        "u2,view,nexus",
+        "u2,view,galaxy",
+        "u3,view,surface",
+        "u3,view,nexus",
+        "u4,view,iphone",
+        "u4,view,ipad",
+        "u4,view,galaxy")
 
+    val UnequalDimensionsSelfSimilarity = tokenize(Iterable(
+        "ipad\tiphone:1.7260924347106847",
+        "iphone\tipad:1.7260924347106847",
+        "nexus\tgalaxy:1.7260924347106847",
+        "galaxy\tnexus:1.7260924347106847"))
+
+    val UnequalDimensionsCrossSimilarity = tokenize(Iterable(
+        "galaxy\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 nexus:1.7260924347106847",
+        "iphone\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 surface:1.7260924347106847 nexus:1.7260924347106847",
+        "ipad\tgalaxy:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897",
+        "nexus\tiphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897 galaxy:0.6795961471815897"))
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
-
-    beforeEach // restart the test context to read the output of the driver
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs UnequalDimensionsSelfSimilarity
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarity
+    tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
 
   }
 
@@ -600,82 +564,56 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,phones",
-      "u1,view,mobile_acc",
-      "u2,view,phones",
-      "u2,view,tablets",
-      "u2,view,mobile_acc",
-      "u3,view,mobile_acc",
-      "u4,view,phones",
-      "u4,view,tablets",
-      "u4,view,soap")
-
-    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,phones",
+        "u1,view,mobile_acc",
+        "u2,view,phones",
+        "u2,view,tablets",
+        "u2,view,mobile_acc",
+        "u3,view,mobile_acc",
+        "u4,view,phones",
+        "u4,view,tablets",
+        "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
         "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
         "surface\tmobile_acc:0.6795961471815897",
         "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
         "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
-        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897")
+        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars",
-      "--writeAllDatasets"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
 
   }
 
-  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
-  def tokenize(a: Iterable[String]): Iterable[String] = {
-    var r: Iterable[String] = Iterable()
-    a.foreach { l =>
-      l.split("\t").foreach{ s =>
-        r = r ++ s.split(",")
-      }
-    }
-    r.asInstanceOf[Iterable[String]]
-  }
-
-  override def afterAll = {
-    removeTmpDir
-    super.afterAll
-  }
-
-  def removeTmpDir = {
-    // remove TmpDir
-    val fs = FileSystem.get(new Configuration())
-    fs.delete(new Path(TmpDir), true) // delete recursively
-  }
-
   test("A.t %*% B after changing row cardinality of A"){
     // todo: move to math tests but this is Spark specific
 
@@ -720,57 +658,82 @@ removed ==> u3	0	      0	      1	          0
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,phones",
-      "u1,view,mobile_acc",
-      "u2,view,phones",
-      "u2,view,tablets",
-      "u2,view,mobile_acc",
-      //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
-      "u4,view,phones",
-      "u4,view,tablets",
-      "u4,view,soap")
-
-    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,phones",
+        "u1,view,mobile_acc",
+        "u2,view,phones",
+        "u2,view,tablets",
+        "u2,view,mobile_acc",
+        //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
+        "u4,view,phones",
+        "u4,view,tablets",
+        "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
         "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847",
         "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897",
         "surface",
         "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897",
-        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847")
+        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars",
-      "--writeAllDatasets"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+  }
+
+  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
+  def tokenize(a: Iterable[String]): Iterable[String] = {
+    var r: Iterable[String] = Iterable()
+    a.foreach { l =>
+      l.split("\t").foreach{ s =>
+        r = r ++ s.split("[\t ]")
+      }
+    }
+    r
+  }
+
+  override protected def beforeAll(configMap: ConfigMap) {
+    super.beforeAll(configMap)
+
+    // just in case there is one left over
+    val fs = FileSystem.get(new Configuration())
+    fs.delete(new Path(TmpDir), true) // delete recursively
+
+    ItemSimilarityDriver.useContext(mahoutCtx) // for testing use the test context
+  }
+
+  override protected def afterEach() {
+
+    val fs = FileSystem.get(new Configuration())
+    fs.delete(new Path(TmpDir), true) // delete recursively
+
+    super.afterEach()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
deleted file mode 100644
index 12c9034..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.scalatest.FunSuite
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeOps._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpABt
-
-/** Tests for AB' operator algorithms */
-class ABtSuite extends FunSuite with DistributedSparkSuite {
-
-  test("ABt") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 3)
-    val drmB = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val op = new OpABt(drmA, drmB)
-
-    val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
-
-    val inCoreMControl = inCoreA %*% inCoreB.t
-    val inCoreM = drm.collect
-
-    assert((inCoreM - inCoreMControl).norm < 1E-5)
-
-    println(inCoreM)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
deleted file mode 100644
index be65e32..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-
-/** Elementwise matrix operation tests */
-class AewBSuite extends FunSuite with DistributedSparkSuite {
-
-  test("A * B Hadamard") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "*")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA * inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A + B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "+")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA + inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A - B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "-")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA - inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A / B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "/")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA / inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
deleted file mode 100644
index c31f27c..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.sparkbindings._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpAtA
-
-/** Tests for {@link XtX} */
-class AtASuite extends FunSuite with DistributedSparkSuite {
-
-  test("AtA slim") {
-
-    val inCoreA = dense((1, 2), (2, 3))
-    val drmA = drmParallelize(inCoreA)
-
-    val operator = new OpAtA[Int](A = drmA)
-    val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd)
-    println(inCoreAtA)
-
-    val expectedAtA = inCoreA.t %*% inCoreA
-    println(expectedAtA)
-
-    assert(expectedAtA === inCoreAtA)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
deleted file mode 100644
index 16632ec..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-
-/** Tests for A' algorithms */
-class AtSuite extends FunSuite with DistributedSparkSuite {
-
-  test("At") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpAt(drmA)
-    val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol)
-    val inCoreAt = drmAt.collect
-    val inCoreControlAt = inCoreA.t
-
-    println(inCoreAt)
-    assert((inCoreAt - inCoreControlAt).norm < 1E-5)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
new file mode 100644
index 0000000..1521cb8
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import collection._
+import JavaConversions._
+import org.scalatest.FunSuite
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
+import org.apache.mahout.math.drm.logical.{OpAt, OpAtA, OpAewB, OpABt}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+/** Collection of physical blas operator tests. */
+class BlasSuite extends FunSuite with DistributedSparkSuite {
+
+  test("ABt") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 3)
+    val drmB = drmParallelize(m = inCoreB, numPartitions = 2)
+
+    val op = new OpABt(drmA, drmB)
+
+    val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
+
+    val inCoreMControl = inCoreA %*% inCoreB.t
+    val inCoreM = drm.collect
+
+    assert((inCoreM - inCoreMControl).norm < 1E-5)
+
+    println(inCoreM)
+  }
+
+  test("A * B Hadamard") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "*")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA * inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A + B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "+")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA + inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A - B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "-")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA - inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A / B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "/")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA / inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("AtA slim") {
+
+    val inCoreA = dense((1, 2), (2, 3))
+    val drmA = drmParallelize(inCoreA)
+
+    val operator = new OpAtA[Int](A = drmA)
+    val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd)
+    println(inCoreAtA)
+
+    val expectedAtA = inCoreA.t %*% inCoreA
+    println(expectedAtA)
+
+    assert(expectedAtA === inCoreAtA)
+
+  }
+
+  test("At") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    val op = new OpAt(drmA)
+    val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol)
+    val inCoreAt = drmAt.collect
+    val inCoreControlAt = inCoreA.t
+
+    println(inCoreAt)
+    assert((inCoreAt - inCoreControlAt).norm < 1E-5)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index a0136e0..29c8bea 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.sparkbindings.test
 
-import org.scalatest.Suite
+import org.scalatest.{ConfigMap, BeforeAndAfterAllConfigMap, Suite}
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite}
@@ -29,10 +29,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
   protected implicit var mahoutCtx: DistributedContext = _
   protected var masterUrl = null.asInstanceOf[String]
 
-  override protected def beforeEach() {
-    super.beforeEach()
-
-    masterUrl = "local[2]"
+  protected def initContext() {
+    masterUrl = "local[3]"
     mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
       appName = "MahoutLocalContext",
       // Do not run MAHOUT_HOME jars in unit tests.
@@ -44,7 +42,7 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
     )
   }
 
-  override protected def afterEach() {
+  protected def resetContext() {
     if (mahoutCtx != null) {
       try {
         mahoutCtx.close()
@@ -52,6 +50,21 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
         mahoutCtx = null
       }
     }
+  }
+
+  override protected def beforeEach() {
+    super.beforeEach()
+//    initContext()
+  }
+
+
+  override protected def beforeAll(configMap: ConfigMap): Unit = {
+    super.beforeAll(configMap)
+    initContext()
+  }
+
+  override protected def afterEach() {
+//    resetContext()
     super.afterEach()
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
index d5d16a8..e48e7c7 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
@@ -17,14 +17,14 @@
 
 package org.apache.mahout.sparkbindings.test
 
-import org.scalatest.Suite
+import org.scalatest.{ConfigMap, Suite}
 import org.apache.log4j.{Level, Logger, BasicConfigurator}
 
 trait LoggerConfiguration extends org.apache.mahout.test.LoggerConfiguration {
   this: Suite =>
 
-  override protected def beforeAll(): Unit = {
-    super.beforeAll()
-    Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.DEBUG)
+  override protected def beforeAll(configMap: ConfigMap) {
+    super.beforeAll(configMap)
+    Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.INFO)
   }
 }


[3/6] git commit: NOJIRA: fix artifact names to follow Scala convention (Anand Avati via dlyubimov) This closes apache/mahout#39.

Posted by dl...@apache.org.
NOJIRA: fix artifact names to follow Scala convention (Anand Avati via dlyubimov)
This closes apache/mahout#39.

Squashed commit of the following:

commit 44b1517f06a6e58c2126f47a463b207843fe02f3
Author: Anand Avati <av...@redhat.com>
Date:   Mon Aug 4 15:26:05 2014 -0700

    NOJIRA: fix artifact names to follow Scala convention

    Scala maven artifacts follow convention of appending Scala
    compiler version number to the name. This is necessary
    as binaries across Scala major version are not compatible.

    Signed-off-by: Anand Avati <av...@redhat.com>


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/00c0149b
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/00c0149b
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/00c0149b

Branch: refs/heads/spark-1.0.x
Commit: 00c0149b95d3621ed5991578ac534ba5fdf5ac02
Parents: 7a50a29
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Wed Aug 6 13:23:19 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Wed Aug 6 13:27:34 2014 -0700

----------------------------------------------------------------------
 math-scala/pom.xml  |  2 +-
 pom.xml             |  6 +++---
 spark-shell/pom.xml |  6 +++---
 spark/pom.xml       | 14 +++-----------
 4 files changed, 10 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/00c0149b/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 95fe2c7..2a348eb 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -27,7 +27,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>mahout-math-scala</artifactId>
+  <artifactId>mahout-math-scala_2.10</artifactId>
   <name>Mahout Math/Scala wrappers</name>
   <description>High performance scientific and technical computing data structures and methods,
     mostly based on CERN's

http://git-wip-us.apache.org/repos/asf/mahout/blob/00c0149b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 809421b..aad4c9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,19 +183,19 @@
       </dependency>
 
       <dependency>
-        <artifactId>mahout-math-scala</artifactId>
+        <artifactId>mahout-math-scala_${scala.major}</artifactId>
         <groupId>${project.groupId}</groupId>
         <version>${project.version}</version>
       </dependency>
       <dependency>
         <groupId>${project.groupId}</groupId>
         <version>${project.version}</version>
-        <artifactId>mahout-math-scala</artifactId>
+        <artifactId>mahout-math-scala_${scala.major}</artifactId>
         <classifier>tests</classifier>
       </dependency>
 
       <dependency>
-        <artifactId>mahout-spark</artifactId>
+        <artifactId>mahout-spark_${scala.major}</artifactId>
         <groupId>${project.groupId}</groupId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/00c0149b/spark-shell/pom.xml
----------------------------------------------------------------------
diff --git a/spark-shell/pom.xml b/spark-shell/pom.xml
index 059270f..65df401 100644
--- a/spark-shell/pom.xml
+++ b/spark-shell/pom.xml
@@ -28,7 +28,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>mahout-spark-shell</artifactId>
+  <artifactId>mahout-spark-shell_2.10</artifactId>
   <name>Mahout Spark bindings shell</name>
   <description>
     Mahout Bindings for Apache Spark
@@ -165,12 +165,12 @@
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-spark</artifactId>
+      <artifactId>mahout-spark_${scala.major}</artifactId>
     </dependency>
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala</artifactId>
+      <artifactId>mahout-math-scala_${scala.major}</artifactId>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/00c0149b/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 03ea2a0..0946cee 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -28,7 +28,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>mahout-spark</artifactId>
+  <artifactId>mahout-spark_2.10</artifactId>
   <name>Mahout Spark bindings</name>
   <description>
     Mahout Bindings for Apache Spark
@@ -294,7 +294,7 @@
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala</artifactId>
+      <artifactId>mahout-math-scala_${scala.major}</artifactId>
     </dependency>
 
     <dependency>
@@ -310,7 +310,7 @@
 
     <dependency>
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala</artifactId>
+      <artifactId>mahout-math-scala_${scala.major}</artifactId>
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
@@ -324,14 +324,6 @@
       <version>3.2.0</version>
     </dependency>
 
-    <!-- spark stuff -->
-    
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.major}</artifactId>
-      <version>${spark.version}</version>
-    </dependency>
-
     <!-- scala stuff -->
 
     <dependency>