You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/08/08 20:54:45 UTC

[6/6] git commit: MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel) this closes apache/mahout#40

MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel)
this closes apache/mahout#40

Squashed commit of the following:

commit 2e362dad82aef764bef163a64eb2bfc1b836a07e
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 15:29:51 2014 -0700

    no need to delete tmp afterAll since doing it afterEach

commit 29f73e514570073486b6d8084f4d8a958765fc95
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 15:24:12 2014 -0700

    had to fall back to tokenized data comparison in tests since the order of values cannot be relied upon

commit ec42fe7a1ba6b26aa39bb99e977af7ebb15d9523
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 13:14:26 2014 -0700

    added a check in driver to see if the context should be closed, not when running a test

commit 2e24ddc7dd612a91fd36312973b05d715def800d
Merge: a809740 1d07cf0
Author: pferrel <pa...@occamsmachete.com>
Date:   Thu Aug 7 12:19:16 2014 -0700

    changing tests for drivers to reuse the test context instead of creating new ones

commit 1d07cf0d1a4409bab9d29e824543c0dae7c0d903
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 15:07:58 2014 -0700

    license

commit f1a31a77f5a8b4d4961a871a88e1a4f5194df90b
Merge: d64146f 00c0149
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 13:29:38 2014 -0700

    Merge branch 'master' into spark-1.0.x

commit d64146ff12be5ffad1832075539397be94c999a3
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:58:07 2014 -0700

    "deleteOnExit" doesn't work in tests, rolling back. needs code added to test base for scala test that handles temporary directories, similarly to how it's been done for junit.

commit 69c393a373a6a80b1542a745cca685b2709696b6
Merge: 439e878 7a50a29
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:39:28 2014 -0700

    Merge branch 'master' into spark-1.0.x

commit 439e87850d552938d2d0e2c68507c200cabd8d1c
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:31:31 2014 -0700

    Temporarily disable ItemSimilarityDriverSuite as failing under Spark 1.0.x for yet unknown reason

commit af099a2d53f673ce37a1d483a3424b78e6b9cb9c
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 12:30:51 2014 -0700

    MAHOUT-1597: A + 1.0 (fixes)

commit 194b77438f532cc7291f382710aa13d97c07a249
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 11:31:56 2014 -0700

    Single Blas suite to speed up, share session

commit 26a5824ca9bf64430f50f3c524ca14a1c68a04b5
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Wed Aug 6 10:58:48 2014 -0700

    Shared context, local[3], bumping up Xmx to 768m to run tests

commit e31e9b97724600f806af7e7f861ebdc7943e54bc
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Aug 4 17:28:35 2014 -0700

    bumping scala to 10.4

commit 13e909b58eaa89e212415318655dbe82ef982323
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Aug 4 15:00:59 2014 -0700

    Initial migration.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ee6359f6
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ee6359f6
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ee6359f6

Branch: refs/heads/spark-1.0.x
Commit: ee6359f621b508ab7f21df0316941e68c75eb3e5
Parents: a809740
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Fri Aug 8 11:52:10 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Fri Aug 8 11:52:10 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../mahout/test/LoggerConfiguration.scala       |   8 +-
 pom.xml                                         |   6 +-
 .../mahout/drivers/ItemSimilarityDriver.scala   |   5 +-
 .../apache/mahout/drivers/MahoutDriver.scala    |  25 +-
 .../mahout/drivers/MahoutOptionParser.scala     |   5 -
 .../mahout/sparkbindings/SparkEngine.scala      |   2 +-
 .../drivers/ItemSimilarityDriverSuite.scala     | 745 +++++++++----------
 .../mahout/sparkbindings/blas/ABtSuite.scala    |  54 --
 .../mahout/sparkbindings/blas/AewBSuite.scala   | 101 ---
 .../mahout/sparkbindings/blas/AtASuite.scala    |  48 --
 .../mahout/sparkbindings/blas/AtSuite.scala     |  44 --
 .../mahout/sparkbindings/blas/BlasSuite.scala   | 154 ++++
 .../test/DistributedSparkSuite.scala            |  25 +-
 .../test/LoggerConfiguration.scala              |   8 +-
 15 files changed, 557 insertions(+), 675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 538b12b..aefb838 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1603: Tweaks for Spark 1.0.x (dlyubimov & pferrel)
+
   MAHOUT-1596: implement rbind() operator (Anand Avati and dlyubimov)
 
   MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if rdd is missing rows, Spark side (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
index 95b92b7..7a34aa2 100644
--- a/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/test/LoggerConfiguration.scala
@@ -1,13 +1,13 @@
 package org.apache.mahout.test
 
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, BeforeAndAfter, Suite}
+import org.scalatest._
 import org.apache.log4j.{Level, Logger, BasicConfigurator}
 
-trait LoggerConfiguration extends BeforeAndAfterAll {
+trait LoggerConfiguration extends BeforeAndAfterAllConfigMap {
   this: Suite =>
 
-  override protected def beforeAll(): Unit = {
-    super.beforeAll()
+  override protected def beforeAll(configMap: ConfigMap): Unit = {
+    super.beforeAll(configMap)
     BasicConfigurator.resetConfiguration()
     BasicConfigurator.configure()
     Logger.getRootLogger.setLevel(Level.ERROR)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aad4c9c..ef9ae03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,8 +108,8 @@
     <lucene.version>4.6.1</lucene.version>
     <slf4j.version>1.7.5</slf4j.version>
     <scala.major>2.10</scala.major>
-    <scala.version>2.10.3</scala.version>
-    <spark.version>0.9.1</spark.version>
+    <scala.version>2.10.4</scala.version>
+    <spark.version>1.0.1</spark.version>
   </properties>
   <issueManagement>
     <system>Jira</system>
@@ -557,7 +557,7 @@
         <configuration>
           <forkCount>2</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx512m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin
+          <argLine>-Xmx768m -Djava.security.manager -Djava.library.path=${env.HADOOP_HOME}\bin
           -Djava.security.policy=${project.build.directory}/../../buildtools/src/test/resources/java.policy</argLine>
           <argLine>-Djava.security.auth.login.config=${project.build.directory}/../../buildtools/src/test/resources/jaas.config</argLine>
           <testFailureIgnore>false</testFailureIgnore>

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index e0eaabc..460106f 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -123,8 +123,7 @@ object ItemSimilarityDriver extends MahoutDriver {
   }
 
   override def start(masterUrl: String = options("master").asInstanceOf[String],
-      appName: String = options("appName").asInstanceOf[String],
-      dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]):
+      appName: String = options("appName").asInstanceOf[String]):
     Unit = {
 
     // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
@@ -134,7 +133,7 @@ object ItemSimilarityDriver extends MahoutDriver {
       .set("spark.kryoserializer.buffer.mb", "200")
       .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String])
 
-    super.start(masterUrl, appName, dontAddMahoutJars)
+    super.start(masterUrl, appName)
 
     val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String],
         "filter" -> options("filter1").asInstanceOf[String],

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
index 796a66a..e92ed37 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -58,32 +58,35 @@ abstract class MahoutDriver {
 
 
   implicit var mc: DistributedContext = _
-  implicit val sparkConf = new SparkConf()
+  implicit var sparkConf = new SparkConf()
+  var _useExistingContext: Boolean = false
 
   /** Creates a Spark context to run the job inside.
     * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
     * these must be set before the context is created.
     * @param masterUrl Spark master URL
     * @param appName  Name to display in Spark UI
-    * @param customJars List of paths to custom jars
     * */
-  protected def start(masterUrl: String, appName: String, customJars:Traversable[String]) : Unit = {
-    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
-  }
-
-  protected def start(masterUrl: String, appName: String, dontAddMahoutJars: Boolean = false) : Unit = {
-    val customJars = Traversable.empty[String]
-    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf, !dontAddMahoutJars)
+  protected def start(masterUrl: String, appName: String) : Unit = {
+    if (!_useExistingContext) {
+      mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+    }
   }
 
   /** Override (optionally) for special cleanup */
   protected def stop: Unit = {
-    mc.close
+    if (!_useExistingContext) mc.close
   }
 
-  /** This is wher you do the work, call start first, then before exiting call stop */
+  /** This is where you do the work, call start first, then before exiting call stop */
   protected def process: Unit
 
   /** Parse command line and call process */
   def main(args: Array[String]): Unit
+
+  def useContext(context: DistributedContext): Unit = {
+    _useExistingContext = true
+    mc = context
+    sparkConf = mc.getConf
+  }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index ba4ca1d..3aada78 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -24,7 +24,6 @@ object MahoutOptionParser {
   // set up the various default option groups
   final val GenericOptions = immutable.HashMap[String, Any](
     "randomSeed" -> System.currentTimeMillis().toInt,
-    "dontAddMahoutJars" -> false,
     "writeAllDatasets" -> false)
 
   final val SparkOptions = immutable.HashMap[String, Any](
@@ -102,10 +101,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
       if (x > 0) success else failure("Option --randomSeed must be > 0")
     }
 
-    opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) =>
-      options + ("dontAddMahoutJars" -> true)
-    }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly
-
     //output both input DRMs
     opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
       options + ("writeAllDatasets" -> true)

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 36223fc..dedb279 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -127,7 +127,7 @@ object SparkEngine extends DistributedEngine {
    */
   def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
 
-    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minSplits = parMin)
+    val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
         // Get rid of VectorWritable
         .map(t => (t._1, t._2.get()))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index f1981bb..cfabfdb 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -19,42 +19,30 @@ package org.apache.mahout.drivers
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.scalatest.FunSuite
+import org.scalatest.{ConfigMap, FunSuite}
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.test.MahoutSuite
-
-
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings._
 
 //todo: take out, only for temp tests
-import org.apache.mahout.math._
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm._
 import RLikeDrmOps._
 import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.math.function.{VectorFunction, Functions}
 
 
-
-class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with DistributedSparkSuite  {
+class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite  {
 
 /*
-  // correct self-cooccurrence with LLR
   final val matrixLLRCoocAtAControl = dense(
-    (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
-    (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
-    (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
-    (0.0,                0.0,                0.0,                     0.0,                0.0))
+      (0.0,                1.7260924347106847, 0.0,                     0.0,                0.0),
+      (1.7260924347106847, 0.0,                0.0,                     0.0,                0.0),
+      (0.0,                0.0,                0.0,                     1.7260924347106847, 0.0),
+      (0.0,                0.0,                1.7260924347106847,      0.0,                0.0),
+      (0.0,                0.0,                0.0,                     0.0,                0.0))
 
-  // correct cross-cooccurrence with LLR
   final val matrixLLRCoocBtAControl = dense(
       (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
       (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
@@ -78,20 +66,35 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
       "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
       "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")
 
+  // todo: a better test would be to sort each vector by itemID and compare rows, tokens misses some error cases
+  final val SelfSimilairtyTokens = tokenize(Iterable(
+      "galaxy\tnexus:1.7260924347106847",
+      "ipad\tiphone:1.7260924347106847",
+      "nexus\tgalaxy:1.7260924347106847",
+      "iphone\tipad:1.7260924347106847",
+      "surface"))
+
+  val CrossIndicatorTokens = tokenize(Iterable(
+      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
+      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
+      "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
+
   final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
 
   /*
     //Clustered Spark and HDFS, not a good everyday build test
     ItemSimilarityDriver.main(Array(
-      "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
-      "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
-      "--master", "spark://occam4:7077",
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1"
+        "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+        "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+        "--master", "spark://occam4:7077",
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"
     ))
 */
   // local multi-threaded Spark with HDFS using large dataset
@@ -109,7 +112,8 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     ))
   */
 
-  test ("ItemSimilarityDriver, non-full-spec CSV"){
+  // TODO: failing, temporarily disabled
+  test("ItemSimilarityDriver, non-full-spec CSV") {
 
     val InFile = TmpDir + "in-file.csv/" //using part files, not single file
     val OutPath = TmpDir + "indicator-matrices/"
@@ -140,147 +144,133 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--writeAllDatasets",
-      "--dontAddMahoutJars"))
-    
-
-    beforeEach // restart the test context to read the output of the driver
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
     // todo: these comparisons rely on a sort producing the same lines, which could possibly
     // fail since the sort is on value and these can be the same for all items in a vector
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
   }
 
 
 
-  test ("ItemSimilarityDriver TSV "){
+  test("ItemSimilarityDriver TSV ") {
 
     val InFile = TmpDir + "in-file.tsv/"
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1\tpurchase\tiphone",
-      "u1\tpurchase\tipad",
-      "u2\tpurchase\tnexus",
-      "u2\tpurchase\tgalaxy",
-      "u3\tpurchase\tsurface",
-      "u4\tpurchase\tiphone",
-      "u4\tpurchase\tgalaxy",
-      "u1\tview\tiphone",
-      "u1\tview\tipad",
-      "u1\tview\tnexus",
-      "u1\tview\tgalaxy",
-      "u2\tview\tiphone",
-      "u2\tview\tipad",
-      "u2\tview\tnexus",
-      "u2\tview\tgalaxy",
-      "u3\tview\tsurface",
-      "u3\tview\tnexus",
-      "u4\tview\tiphone",
-      "u4\tview\tipad",
-      "u4\tview\tgalaxy")
+        "u1\tpurchase\tiphone",
+        "u1\tpurchase\tipad",
+        "u2\tpurchase\tnexus",
+        "u2\tpurchase\tgalaxy",
+        "u3\tpurchase\tsurface",
+        "u4\tpurchase\tiphone",
+        "u4\tpurchase\tgalaxy",
+        "u1\tview\tiphone",
+        "u1\tview\tipad",
+        "u1\tview\tnexus",
+        "u1\tview\tgalaxy",
+        "u2\tview\tiphone",
+        "u2\tview\tipad",
+        "u2\tview\tnexus",
+        "u2\tview\tgalaxy",
+        "u3\tview\tsurface",
+        "u3\tview\tnexus",
+        "u4\tview\tiphone",
+        "u4\tview\tipad",
+        "u4\tview\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "[,\t]",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "[,\t]",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
-    beforeEach // restart the test context to read the output of the driver
     // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
     // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver log-ish files"){
+  test("ItemSimilarityDriver log-ish files") {
 
     val InFile = TmpDir + "in-file.log/"
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
-      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
-      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
-      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
-      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
+        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
+        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
+        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
+        "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
+        "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "\t",
-      "--itemIDPosition", "4",
-      "--rowIDPosition", "1",
-      "--filterPosition", "2",
-      "--dontAddMahoutJars"))
+        "--input", InFile,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "\t",
+        "--itemIDPosition", "4",
+        "--rowIDPosition", "1",
+        "--filterPosition", "2"))
 
-    beforeEach // restart the test context to read the output of the driver
 
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
-  test ("ItemSimilarityDriver legacy supported file format"){
+  test("ItemSimilarityDriver legacy supported file format") {
 
     val InDir = TmpDir + "in-dir/"
     val InFilename = "in-file.tsv"
@@ -289,20 +279,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-      "0,0,1",
-      "0,1,1",
-      "1,2,1",
-      "1,3,1",
-      "2,4,1",
-      "3,0,1",
-      "3,3,1")
-
-    val Answer = Iterable(
-      "0\t1:1.7260924347106847",
-      "3\t2:1.7260924347106847",
-      "1\t0:1.7260924347106847",
-      "4",
-      "2\t3:1.7260924347106847")
+        "0,0,1",
+        "0,1,1",
+        "1,2,1",
+        "1,3,1",
+        "2,4,1",
+        "3,0,1",
+        "3,3,1")
+
+    val Answer = tokenize(Iterable(
+        "0\t1:1.7260924347106847",
+        "3\t2:1.7260924347106847",
+        "1\t0:1.7260924347106847",
+        "4",
+        "2\t3:1.7260924347106847"))
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
@@ -312,24 +302,18 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     //rename part-00000 to something.tsv
     fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InPath,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--dontAddMahoutJars"))
+        "--input", InPath,
+        "--output", OutPath,
+        "--master", masterUrl))
 
-    beforeEach // restart the test context to read the output of the driver
-    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
-    // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs Answer
+    tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
 
-  test ("ItemSimilarityDriver write search engine output"){
+  test("ItemSimilarityDriver write search engine output") {
 
     val InDir = TmpDir + "in-dir/"
     val InFilename = "in-file.tsv"
@@ -338,20 +322,20 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-      "0,0,1",
-      "0,1,1",
-      "1,2,1",
-      "1,3,1",
-      "2,4,1",
-      "3,0,1",
-      "3,3,1")
-
-    val Answer = Iterable(
-      "0\t1",
-      "3\t2",
-      "1\t0",
-      "4",
-      "2\t3")
+        "0,0,1",
+        "0,1,1",
+        "1,2,1",
+        "1,3,1",
+        "2,4,1",
+        "3,0,1",
+        "3,3,1")
+
+    val Answer = tokenize(Iterable(
+        "0\t1",
+        "3\t2",
+        "1\t0",
+        "4",
+        "2\t3"))
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
@@ -361,51 +345,45 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     //rename part-00000 to something.tsv
     fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InPath,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--dontAddMahoutJars",
-      "--omitStrength"))
+        "--input", InPath,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--omitStrength"))
 
-    beforeEach // restart the test context to read the output of the driver
-    // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss
-    // some error cases
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs Answer
+    tokenize(indicatorLines) should contain theSameElementsAs Answer
 
   }
 
-  test("ItemSimilarityDriver recursive file discovery using filename patterns"){
+  test("ItemSimilarityDriver recursive file discovery using filename patterns") {
     //directory structure using the following
     // tmp/data/m1.tsv
     // tmp/data/more-data/another-dir/m2.tsv
     val M1Lines = Array(
-      "u1\tpurchase\tiphone",
-      "u1\tpurchase\tipad",
-      "u2\tpurchase\tnexus",
-      "u2\tpurchase\tgalaxy",
-      "u3\tpurchase\tsurface",
-      "u4\tpurchase\tiphone",
-      "u4\tpurchase\tgalaxy",
-      "u1\tview\tiphone")
+        "u1\tpurchase\tiphone",
+        "u1\tpurchase\tipad",
+        "u2\tpurchase\tnexus",
+        "u2\tpurchase\tgalaxy",
+        "u3\tpurchase\tsurface",
+        "u4\tpurchase\tiphone",
+        "u4\tpurchase\tgalaxy",
+        "u1\tview\tiphone")
 
     val M2Lines = Array(
-      "u1\tview\tipad",
-      "u1\tview\tnexus",
-      "u1\tview\tgalaxy",
-      "u2\tview\tiphone",
-      "u2\tview\tipad",
-      "u2\tview\tnexus",
-      "u2\tview\tgalaxy",
-      "u3\tview\tsurface",
-      "u3\tview\tnexus",
-      "u4\tview\tiphone",
-      "u4\tview\tipad",
-      "u4\tview\tgalaxy")
+        "u1\tview\tipad",
+        "u1\tview\tnexus",
+        "u1\tview\tgalaxy",
+        "u2\tview\tiphone",
+        "u2\tview\tipad",
+        "u2\tview\tnexus",
+        "u2\tview\tgalaxy",
+        "u3\tview\tsurface",
+        "u3\tview\tnexus",
+        "u4\tview\tiphone",
+        "u4\tview\tipad",
+        "u4\tview\tgalaxy")
 
     val InFilenameM1 = "m1.tsv"
     val InDirM1 = TmpDir + "data/"
@@ -434,27 +412,23 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
 
     // local multi-threaded Spark with default FS, suitable for build tests but need better location for data
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     ItemSimilarityDriver.main(Array(
-      "--input", InPathStart,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", "\t",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--filenamePattern", "m..tsv",
-      "--recursive",
-      "--dontAddMahoutJars"))
+        "--input", InPathStart,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", "\t",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--filenamePattern", "m..tsv",
+        "--recursive"))
 
-    beforeEach()// restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
@@ -465,53 +439,49 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,iphone",
-      "u1,view,ipad",
-      "u1,view,nexus",
-      "u1,view,galaxy",
-      "u2,view,iphone",
-      "u2,view,ipad",
-      "u2,view,nexus",
-      "u2,view,galaxy",
-      "u3,view,surface",
-      "u3,view,nexus",
-      "u4,view,iphone",
-      "u4,view,ipad",
-      "u4,view,galaxy")
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,iphone",
+        "u1,view,ipad",
+        "u1,view,nexus",
+        "u1,view,galaxy",
+        "u2,view,iphone",
+        "u2,view,ipad",
+        "u2,view,nexus",
+        "u2,view,galaxy",
+        "u3,view,surface",
+        "u3,view,nexus",
+        "u4,view,iphone",
+        "u4,view,ipad",
+        "u4,view,galaxy")
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    crossIndicatorLines should contain theSameElementsAs CrossIndicatorLines
+    tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens
 
   }
 
@@ -522,68 +492,62 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      // remove one user so A'B will be of different dimensions
-      // ItemSimilarityDriver should create one unified user dictionary and so account for this
-      // discrepancy as a blank row: "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,iphone",
-      "u1,view,ipad",
-      "u1,view,nexus",
-      "u1,view,galaxy",
-      "u2,view,iphone",
-      "u2,view,ipad",
-      "u2,view,nexus",
-      "u2,view,galaxy",
-      "u3,view,surface",
-      "u3,view,nexus",
-      "u4,view,iphone",
-      "u4,view,ipad",
-      "u4,view,galaxy")
-
-    val UnequalDimensionsSelfSimilarity = Iterable(
-      "ipad\tiphone:1.7260924347106847",
-      "iphone\tipad:1.7260924347106847",
-      "nexus\tgalaxy:1.7260924347106847",
-      "galaxy\tnexus:1.7260924347106847")
-
-    val UnequalDimensionsCrossSimilarity = Iterable(
-      "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847",
-      "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897",
-      "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847")
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        // remove one user so A'B will be of different dimensions
+        // ItemSimilarityDriver should create one unified user dictionary and so account for this
+        // discrepancy as a blank row: "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,iphone",
+        "u1,view,ipad",
+        "u1,view,nexus",
+        "u1,view,galaxy",
+        "u2,view,iphone",
+        "u2,view,ipad",
+        "u2,view,nexus",
+        "u2,view,galaxy",
+        "u3,view,surface",
+        "u3,view,nexus",
+        "u4,view,iphone",
+        "u4,view,ipad",
+        "u4,view,galaxy")
 
+    val UnequalDimensionsSelfSimilarity = tokenize(Iterable(
+        "ipad\tiphone:1.7260924347106847",
+        "iphone\tipad:1.7260924347106847",
+        "nexus\tgalaxy:1.7260924347106847",
+        "galaxy\tnexus:1.7260924347106847"))
+
+    val UnequalDimensionsCrossSimilarity = tokenize(Iterable(
+        "galaxy\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 nexus:1.7260924347106847",
+        "iphone\tgalaxy:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 surface:1.7260924347106847 nexus:1.7260924347106847",
+        "ipad\tgalaxy:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897",
+        "nexus\tiphone:0.6795961471815897 ipad:0.6795961471815897 nexus:0.6795961471815897 galaxy:0.6795961471815897"))
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars"))
-
-    beforeEach // restart the test context to read the output of the driver
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"))
 
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs UnequalDimensionsSelfSimilarity
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarity
+    tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity
 
   }
 
@@ -600,82 +564,56 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with Distribut
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,phones",
-      "u1,view,mobile_acc",
-      "u2,view,phones",
-      "u2,view,tablets",
-      "u2,view,mobile_acc",
-      "u3,view,mobile_acc",
-      "u4,view,phones",
-      "u4,view,tablets",
-      "u4,view,soap")
-
-    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,phones",
+        "u1,view,mobile_acc",
+        "u2,view,phones",
+        "u2,view,tablets",
+        "u2,view,mobile_acc",
+        "u3,view,mobile_acc",
+        "u4,view,phones",
+        "u4,view,tablets",
+        "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
         "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847",
         "surface\tmobile_acc:0.6795961471815897",
         "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897",
         "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847",
-        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897")
+        "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars",
-      "--writeAllDatasets"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
 
   }
 
-  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
-  def tokenize(a: Iterable[String]): Iterable[String] = {
-    var r: Iterable[String] = Iterable()
-    a.foreach { l =>
-      l.split("\t").foreach{ s =>
-        r = r ++ s.split(",")
-      }
-    }
-    r.asInstanceOf[Iterable[String]]
-  }
-
-  override def afterAll = {
-    removeTmpDir
-    super.afterAll
-  }
-
-  def removeTmpDir = {
-    // remove TmpDir
-    val fs = FileSystem.get(new Configuration())
-    fs.delete(new Path(TmpDir), true) // delete recursively
-  }
-
   test("A.t %*% B after changing row cardinality of A"){
     // todo: move to math tests but this is Spark specific
 
@@ -720,57 +658,82 @@ removed ==> u3	0	      0	      1	          0
     val OutPath = TmpDir + "indicator-matrices/"
 
     val lines = Array(
-      "u1,purchase,iphone",
-      "u1,purchase,ipad",
-      "u2,purchase,nexus",
-      "u2,purchase,galaxy",
-      "u3,purchase,surface",
-      "u4,purchase,iphone",
-      "u4,purchase,galaxy",
-      "u1,view,phones",
-      "u1,view,mobile_acc",
-      "u2,view,phones",
-      "u2,view,tablets",
-      "u2,view,mobile_acc",
-      //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
-      "u4,view,phones",
-      "u4,view,tablets",
-      "u4,view,soap")
-
-    val UnequalDimensionsCrossSimilarityLines = Iterable(
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,phones",
+        "u1,view,mobile_acc",
+        "u2,view,phones",
+        "u2,view,tablets",
+        "u2,view,mobile_acc",
+        //"u3,view,mobile_acc",// if this line is removed the cross-cooccurrence should work
+        "u4,view,phones",
+        "u4,view,tablets",
+        "u4,view,soap")
+
+    val UnequalDimensionsCrossSimilarityLines = tokenize(Iterable(
         "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847",
         "ipad\tmobile_acc:1.7260924347106847 phones:0.6795961471815897",
         "surface",
         "nexus\tmobile_acc:1.7260924347106847 tablets:1.7260924347106847 phones:0.6795961471815897",
-        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847")
+        "iphone\tsoap:1.7260924347106847 phones:1.7260924347106847"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other tests will
     // take account of one actual file
     val linesRdd1 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile1)
     val linesRdd2 = mahoutCtx.parallelize(lines).saveAsTextFile(InFile2)
 
-    afterEach // clean up before running the driver, it should handle the Spark conf and context
-
     // local multi-threaded Spark with default HDFS
     ItemSimilarityDriver.main(Array(
-      "--input", InFile1,
-      "--input2", InFile2,
-      "--output", OutPath,
-      "--master", masterUrl,
-      "--filter1", "purchase",
-      "--filter2", "view",
-      "--inDelim", ",",
-      "--itemIDPosition", "2",
-      "--rowIDPosition", "0",
-      "--filterPosition", "1",
-      "--dontAddMahoutJars",
-      "--writeAllDatasets"))
+        "--input", InFile1,
+        "--input2", InFile2,
+        "--output", OutPath,
+        "--master", masterUrl,
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1",
+        "--writeAllDatasets"))
 
-    beforeEach // restart the test context to read the output of the driver
     val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toIterable
     val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toIterable
-    indicatorLines should contain theSameElementsAs SelfSimilairtyLines
-    crossIndicatorLines should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+    tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens
+    tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines
+  }
+
+  // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable'
+  def tokenize(a: Iterable[String]): Iterable[String] = {
+    var r: Iterable[String] = Iterable()
+    a.foreach { l =>
+      l.split("\t").foreach{ s =>
+        r = r ++ s.split("[\t ]")
+      }
+    }
+    r
+  }
+
+  override protected def beforeAll(configMap: ConfigMap) {
+    super.beforeAll(configMap)
+
+    // just in case there is one left over
+    val fs = FileSystem.get(new Configuration())
+    fs.delete(new Path(TmpDir), true) // delete recursively
+
+    ItemSimilarityDriver.useContext(mahoutCtx) // for testing use the test context
+  }
+
+  override protected def afterEach() {
+
+    val fs = FileSystem.get(new Configuration())
+    fs.delete(new Path(TmpDir), true) // delete recursively
+
+    super.afterEach()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
deleted file mode 100644
index 12c9034..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/ABtSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.scalatest.FunSuite
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.sparkbindings._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeOps._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpABt
-
-/** Tests for AB' operator algorithms */
-class ABtSuite extends FunSuite with DistributedSparkSuite {
-
-  test("ABt") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 3)
-    val drmB = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val op = new OpABt(drmA, drmB)
-
-    val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
-
-    val inCoreMControl = inCoreA %*% inCoreB.t
-    val inCoreM = drm.collect
-
-    assert((inCoreM - inCoreMControl).norm < 1E-5)
-
-    println(inCoreM)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
deleted file mode 100644
index be65e32..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AewBSuite.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-
-/** Elementwise matrix operation tests */
-class AewBSuite extends FunSuite with DistributedSparkSuite {
-
-  test("A * B Hadamard") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "*")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA * inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A + B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "+")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA + inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A - B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "-")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA - inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-  test("A / B Elementwise") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9))
-    val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-    val drmB = drmParallelize(m = inCoreB)
-
-    val op = new OpAewB(drmA, drmB, "/")
-
-    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
-
-    val inCoreM = drmM.collect
-    val inCoreMControl = inCoreA / inCoreB
-
-    assert((inCoreM - inCoreMControl).norm < 1E-10)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
deleted file mode 100644
index c31f27c..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtASuite.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.sparkbindings._
-import org.apache.spark.SparkContext._
-import org.apache.mahout.math.drm.logical.OpAtA
-
-/** Tests for {@link XtX} */
-class AtASuite extends FunSuite with DistributedSparkSuite {
-
-  test("AtA slim") {
-
-    val inCoreA = dense((1, 2), (2, 3))
-    val drmA = drmParallelize(inCoreA)
-
-    val operator = new OpAtA[Int](A = drmA)
-    val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd)
-    println(inCoreAtA)
-
-    val expectedAtA = inCoreA.t %*% inCoreA
-    println(expectedAtA)
-
-    assert(expectedAtA === inCoreAtA)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
deleted file mode 100644
index 16632ec..0000000
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/AtSuite.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.blas
-
-import org.scalatest.FunSuite
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-
-/** Tests for A' algorithms */
-class AtSuite extends FunSuite with DistributedSparkSuite {
-
-  test("At") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpAt(drmA)
-    val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol)
-    val inCoreAt = drmAt.collect
-    val inCoreControlAt = inCoreA.t
-
-    println(inCoreAt)
-    assert((inCoreAt - inCoreControlAt).norm < 1E-5)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
new file mode 100644
index 0000000..1521cb8
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/blas/BlasSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import collection._
+import JavaConversions._
+import org.scalatest.FunSuite
+import org.apache.mahout.test.DistributedMahoutSuite
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
+import org.apache.mahout.math.drm.logical.{OpAt, OpAtA, OpAewB, OpABt}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+
+/** Collection of physical blas operator tests. */
+class BlasSuite extends FunSuite with DistributedSparkSuite {
+
+  test("ABt") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 3)
+    val drmB = drmParallelize(m = inCoreB, numPartitions = 2)
+
+    val op = new OpABt(drmA, drmB)
+
+    val drm = new CheckpointedDrmSpark(ABt.abt(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    printf("AB' num partitions = %d.\n", drm.rdd.partitions.size)
+
+    val inCoreMControl = inCoreA %*% inCoreB.t
+    val inCoreM = drm.collect
+
+    assert((inCoreM - inCoreMControl).norm < 1E-5)
+
+    println(inCoreM)
+  }
+
+  test("A * B Hadamard") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "*")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA * inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A + B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "+")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA + inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A - B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (0, 0, 0), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "-")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA - inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("A / B Elementwise") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 0), (7, 8, 9))
+    val inCoreB = dense((3, 4, 5), (5, 6, 7), (10, 20, 30), (9, 8, 7))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+    val drmB = drmParallelize(m = inCoreB)
+
+    val op = new OpAewB(drmA, drmB, "/")
+
+    val drmM = new CheckpointedDrmSpark(AewB.a_ew_b(op, srcA = drmA, srcB = drmB), op.nrow, op.ncol)
+
+    val inCoreM = drmM.collect
+    val inCoreMControl = inCoreA / inCoreB
+
+    assert((inCoreM - inCoreMControl).norm < 1E-10)
+
+  }
+
+  test("AtA slim") {
+
+    val inCoreA = dense((1, 2), (2, 3))
+    val drmA = drmParallelize(inCoreA)
+
+    val operator = new OpAtA[Int](A = drmA)
+    val inCoreAtA = AtA.at_a_slim(operator = operator, srcRdd = drmA.rdd)
+    println(inCoreAtA)
+
+    val expectedAtA = inCoreA.t %*% inCoreA
+    println(expectedAtA)
+
+    assert(expectedAtA === inCoreAtA)
+
+  }
+
+  test("At") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val drmA = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    val op = new OpAt(drmA)
+    val drmAt = new CheckpointedDrmSpark(rdd = At.at(op, srcA = drmA), _nrow = op.nrow, _ncol = op.ncol)
+    val inCoreAt = drmAt.collect
+    val inCoreControlAt = inCoreA.t
+
+    println(inCoreAt)
+    assert((inCoreAt - inCoreControlAt).norm < 1E-5)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index a0136e0..29c8bea 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.sparkbindings.test
 
-import org.scalatest.Suite
+import org.scalatest.{ConfigMap, BeforeAndAfterAllConfigMap, Suite}
 import org.apache.spark.SparkConf
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite}
@@ -29,10 +29,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
   protected implicit var mahoutCtx: DistributedContext = _
   protected var masterUrl = null.asInstanceOf[String]
 
-  override protected def beforeEach() {
-    super.beforeEach()
-
-    masterUrl = "local[2]"
+  protected def initContext() {
+    masterUrl = "local[3]"
     mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
       appName = "MahoutLocalContext",
       // Do not run MAHOUT_HOME jars in unit tests.
@@ -44,7 +42,7 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
     )
   }
 
-  override protected def afterEach() {
+  protected def resetContext() {
     if (mahoutCtx != null) {
       try {
         mahoutCtx.close()
@@ -52,6 +50,21 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat
         mahoutCtx = null
       }
     }
+  }
+
+  override protected def beforeEach() {
+    super.beforeEach()
+//    initContext()
+  }
+
+
+  override protected def beforeAll(configMap: ConfigMap): Unit = {
+    super.beforeAll(configMap)
+    initContext()
+  }
+
+  override protected def afterEach() {
+//    resetContext()
     super.afterEach()
   }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ee6359f6/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
index d5d16a8..e48e7c7 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/LoggerConfiguration.scala
@@ -17,14 +17,14 @@
 
 package org.apache.mahout.sparkbindings.test
 
-import org.scalatest.Suite
+import org.scalatest.{ConfigMap, Suite}
 import org.apache.log4j.{Level, Logger, BasicConfigurator}
 
 trait LoggerConfiguration extends org.apache.mahout.test.LoggerConfiguration {
   this: Suite =>
 
-  override protected def beforeAll(): Unit = {
-    super.beforeAll()
-    Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.DEBUG)
+  override protected def beforeAll(configMap: ConfigMap) {
+    super.beforeAll(configMap)
+    Logger.getLogger("org.apache.mahout.sparkbindings").setLevel(Level.INFO)
   }
 }