You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/07/04 00:59:11 UTC

git commit: MAHOUT-1561, MAHOUT-1568, MAHOUT-1569 text-delimited Spark readers and writers with drivers and a CLI for 'spark-itemsimilarity' closes apache/mahout#22

Repository: mahout
Updated Branches:
  refs/heads/master 37b852539 -> 2b65475c3


MAHOUT-1561, MAHOUT-1568, MAHOUT-1569 text-delimited Spark readers and writers with drivers and a CLI for 'spark-itemsimilarity' closes apache/mahout#22


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2b65475c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2b65475c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2b65475c

Branch: refs/heads/master
Commit: 2b65475c3ab682ebd47cffdc6b502698799cd2c8
Parents: 37b8525
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Jul 3 15:56:37 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Jul 3 15:56:37 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 bin/mahout                                      |  20 +-
 spark/pom.xml                                   |  36 ++
 spark/src/main/assembly/job.xml                 |  46 +++
 .../apache/mahout/cf/CooccurrenceAnalysis.scala |  11 +-
 .../apache/mahout/drivers/FileSysUtils.scala    |  71 ++++
 .../apache/mahout/drivers/IndexedDataset.scala  |  56 +++
 .../mahout/drivers/ItemSimilarityDriver.scala   | 314 +++++++++++++++
 .../apache/mahout/drivers/MahoutDriver.scala    |  83 ++++
 .../mahout/drivers/MahoutOptionParser.scala     |  24 ++
 .../apache/mahout/drivers/ReaderWriter.scala    |  41 ++
 .../org/apache/mahout/drivers/Schema.scala      |  30 ++
 .../drivers/TextDelimitedReaderWriter.scala     | 222 +++++++++++
 .../io/MahoutKryoRegistrator.scala              |   4 +-
 .../mahout/cf/CooccurrenceAnalysisSuite.scala   | 200 ++++++----
 .../drivers/ItemSimilarityDriverSuite.scala     | 382 +++++++++++++++++++
 .../sparkbindings/test/MahoutLocalContext.scala |   4 +-
 17 files changed, 1454 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c9b6a0d..23ad49e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1541, MAHOUT-1568, MAHOUT-1569: Created text-delimited file I/O traits and classes on spark, a MahoutDriver for a CLI and a ItemSimilairtyDriver using the CLI
+
   MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov)
 
   MAHOUT-1580: Optimize getNumNonZeroElements() (ssc)

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index b0a8f6e..5f54181 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -7,7 +7,7 @@
 #   MAHOUT_JAVA_HOME   The java implementation to use.  Overrides JAVA_HOME.
 #
 #   MAHOUT_HEAPSIZE    The maximum amount of heap to use, in MB.
-#                      Default is 1000.
+#                      Default is 4000.
 #
 #   HADOOP_CONF_DIR  The location of a hadoop config directory
 #
@@ -84,6 +84,9 @@ if [ "$1" == "spark-shell" ]; then
   SPARK=1
 fi
 
+if [ "$1" == "spark-itemsimilarity" ]; then
+  SPARK=1
+fi
 
 if [ "$MAHOUT_CORE" != "" ]; then
   IS_CORE=1
@@ -105,7 +108,7 @@ if [ "$JAVA_HOME" = "" ]; then
 fi
 
 JAVA=$JAVA_HOME/bin/java
-JAVA_HEAP_MAX=-Xmx3g
+JAVA_HEAP_MAX=-Xmx4g
 
 # check envvars which might override default args
 if [ "$MAHOUT_HEAPSIZE" != "" ]; then
@@ -156,14 +159,14 @@ then
       CLASSPATH=${CLASSPATH}:$f;
     done
   fi
-  
+
   # add scala dev target
-  for f in $MAHOUT_HOME/math-scala/target/mahout-math-scala-*.jar ; do 
+  for f in $MAHOUT_HOME/math-scala/target/mahout-math-scala-*.jar ; do
     CLASSPATH=${CLASSPATH}:$f;
   done
 
 
-  # add spark-shell -- if we requested shell
+  # add spark-shell -- if we requested shell or other spark CLI driver
   if [ "$SPARK" == "1" ]; then
 
     for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do
@@ -183,7 +186,7 @@ then
        SPARK_CLASSPATH=$("${SPARK_CP_BIN}" 2>/dev/null)
        CLASSPATH="${CLASSPATH}:${SPARK_CLASSPATH}"
     else
-      echo "Cannot find Spark classpath."
+      echo "Cannot find Spark classpath. Is 'SPARK_HOME' set?"
       exit -1
     fi
 
@@ -228,6 +231,11 @@ case "$1" in
     "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@
     stty sane; stty $save_stty
     ;;
+  # Spark CLI drivers go here
+  (spark-itemsimilarity)
+    shift
+    "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.ItemSimilarityDriver" "$@"
+    ;;
   (*)
 
     # default log directory & file

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 5dc566f..03ea2a0 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -83,6 +83,27 @@
         </executions>
       </plugin>
 
+      <!-- create core job dependencies jar -->
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>job</id>
+              <phase>package</phase>
+              <goals>
+                <goal>single</goal>
+              </goals>
+              <configuration>
+                <descriptors>
+                  <descriptor>src/main/assembly/job.xml</descriptor>
+                </descriptors>
+              </configuration>
+            </execution>
+          </executions>
+      </plugin>
+
       <!-- create test jar so other modules can reuse the math test utility classes. -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -145,6 +166,7 @@
           <reportsDirectory>${project.build.directory}/scalatest-reports</reportsDirectory>
           <junitxml>.</junitxml>
           <filereports>WDF TestSuite.txt</filereports>
+          <argLine>-Xmx1024m</argLine>
         </configuration>
         <executions>
           <execution>
@@ -296,6 +318,20 @@
 
     <!--  3rd-party -->
 
+    <dependency>
+      <groupId>com.github.scopt</groupId>
+      <artifactId>scopt_2.10</artifactId>
+      <version>3.2.0</version>
+    </dependency>
+
+    <!-- spark stuff -->
+    
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.major}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+
     <!-- scala stuff -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
new file mode 100644
index 0000000..0c41f3d
--- /dev/null
+++ b/spark/src/main/assembly/job.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+    http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>job</id>
+  <formats>
+   <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <unpack>true</unpack>
+      <unpackOptions>
+        <!-- MAHOUT-1126 -->
+        <excludes>
+          <exclude>META-INF/LICENSE</exclude>
+        </excludes>
+      </unpackOptions>
+      <scope>runtime</scope>
+      <outputDirectory>/</outputDirectory>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-core</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+</assembly>
+  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
index ee44f90..b01332c 100644
--- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
+++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
@@ -22,7 +22,6 @@ import scalabindings._
 import RLikeOps._
 import drm._
 import RLikeDrmOps._
-import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
 import org.apache.mahout.math.stats.LogLikelihood
 import collection._
@@ -96,7 +95,7 @@ object CooccurrenceAnalysis extends Serializable {
    * Compute loglikelihood ratio
    * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
    **/
-  def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+  def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
                          numInteractionsWithAandB: Long, numInteractions: Long) = {
 
     val k11 = numInteractionsWithAandB
@@ -105,6 +104,7 @@ object CooccurrenceAnalysis extends Serializable {
     val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
 
     LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
   }
 
   def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
@@ -131,9 +131,12 @@ object CooccurrenceAnalysis extends Serializable {
             // exclude co-occurrences of the item with itself
             if (crossCooccurrence || thingB != thingA) {
               // Compute loglikelihood ratio
-              val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+              val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
                 cooccurrences.toLong, numUsers)
-              val candidate = thingA -> llrRatio
+
+              // matches hadoop code and maps values to range (0..1)
+              val tLLR = 1.0 - (1.0 / (1.0 + llr))
+              val candidate = thingA -> tLLR
 
               // Enqueue item with score, if belonging to the top-k
               if (topItemsPerThing.size < maxInterestingItemsPerThing) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
new file mode 100644
index 0000000..654f116
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
+
+/**
+  * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
+  * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+  *
+  * @param pathURI Where to start looking for inFiles, only HDFS is currently
+  *                supported. The pathURI may be a list of comma delimited URIs like those supported
+  *                by Spark
+  * @param filePattern regex that must match the entire filename to have the file returned
+  * @param recursive true traverses the filesystem recursively
+  */
+
+case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: Boolean = false) {
+
+  val conf = new Configuration()
+  val fs = FileSystem.get(conf)
+
+  /** returns a string of comma delimited URIs matching the filePattern */
+  def uris :String = {
+    if(recursive){
+      val pathURIs = pathURI.split(",")
+      var files = ""
+      for ( uri <- pathURIs ){
+        files = findFiles(uri, filePattern, files)
+      }
+      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+      files
+    }else{
+      pathURI
+    }
+  }
+
+  /** find matching files in the dir, recursively call self when another directory is found */
+  def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
+    val fileStatuses: Array[FileStatus] = fs.listStatus (new Path(dir))
+    var f :String = files
+    for (fileStatus <- fileStatuses ){
+      if (fileStatus.getPath().getName().matches(filePattern)
+        && !fileStatus.isDir){// found a file
+        if (fileStatus.getLen() != 0) {
+          // file is not empty
+          f = f + fileStatus.getPath.toUri.toString + ","
+        }
+      }else if (fileStatus.isDir){
+        f = findFiles(fileStatus.getPath.toString, filePattern, f)
+      }
+    }
+    f
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
new file mode 100644
index 0000000..0d8c160
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.CheckpointedDrm
+
+/**
+  * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
+  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+  * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
+  * used internal to Mahout Core code.
+  *
+  * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
+  * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
+  * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
+  *
+  * @param matrix  DrmLike[Int], representing the distributed matrix storing the actual data.
+  * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
+  *                  and from the ordinal Mahout Int ID. This one holds row labels
+  * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
+  *                  ID to and from the ordinal Mahout Int ID. This one holds column labels
+  * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+  *       to be not created when not needed.
+  */
+
+case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+}
+
+/**
+  * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
+  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
+  * {{{
+  *   val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source))
+  * }}}
+  */
+
+object IndexedDataset {
+  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
+  def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix,  id2.rowIDs, id2.columnIDs)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
new file mode 100644
index 0000000..77005f1
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -0,0 +1,314 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.cf.CooccurrenceAnalysis
+
+/**
+ * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be
+ * preserved in the
+ * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * will be used to calculate row-wise self-similarity, or when using filters, will generate two
+ * matrices and calculate both the self similarity of the primary matrix and the row-wise
+ * similarity of the primary
+ * to the secondary. Returns one or two directories of text files formatted as specified in
+ * the options.
+ * The options allow flexible control of the input schema, file discovery, output schema, and control of
+ * algorithm parameters.
+ * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
+ * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
+ * you can specify only the input and output file and directory--all else will default to the correct values.
+ * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check
+ *       the --sparkExecutorMemory option.
+ */
+object ItemSimilarityDriver extends MahoutDriver {
+  //todo: Should also take two input streams and do cross similarity with no filter required.
+  // required for examples
+
+  private var options: Options = _
+  private var reader1: TextDelimitedIndexedDatasetReader = _
+  private var reader2: TextDelimitedIndexedDatasetReader = _
+  private var writer: TextDelimitedIndexedDatasetWriter = _
+  private var writeSchema: Schema = _
+
+  /**
+   * @param args  Command line args, if empty a help message is printed.
+   */
+  override def main(args: Array[String]): Unit = {
+    val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") {
+      head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+
+      //Input output options, non-driver specific
+      note("Input, output options")
+      opt[String]('i', "input") required() action { (x, options) =>
+        options.copy(input = x)
+      } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+      opt[String]('o', "output") required() action { (x, options) =>
+        if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally?
+          options.copy(output = x)
+        else
+          options.copy(output = x + "/")
+      } text ("Path for output, any local or HDFS supported URI (required).")
+
+      //Algorithm control options--driver specific
+      note("\nAlgorithm control options:")
+      opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+        options.copy(master = x)
+      }
+
+      opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
+        options.copy(maxPrefs = x)
+      } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x =>
+        if (x > 0) success else failure("Option --maxPrefs must be > 0")
+      }
+
+/** not implemented in CooccurrenceAnalysis.cooccurrence
+      opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
+        options.copy(minPrefs = x)
+      } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
+        if (x > 0) success else failure("Option --minPrefs must be > 0")
+      }
+*/
+
+      opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
+        options.copy(maxSimilaritiesPerItem = x)
+      } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x =>
+        if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+      }
+
+      opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+        options.copy(randomSeed = x)
+      } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x =>
+        if (x > 0) success else failure("Option --randomSeed must be > 0")
+      }
+
+      //Input text file schema--not driver specific but input data specific, tuples input,
+      // not drms
+      note("\nInput text file schema options:")
+      opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+        options.copy(inDelim = x)
+      }
+
+      opt[String]("filter1") abbr ("f1") action { (x, options) =>
+        options.copy(filter1 = x)
+      } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+      opt[String]("filter2") abbr ("f2") action { (x, options) =>
+        options.copy(filter2 = x)
+      } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.")
+
+      opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+        options.copy(rowIDPosition = x)
+      } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+      }
+
+      opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+        options.copy(itemIDPosition = x)
+      } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+      }
+
+      opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+        options.copy(filterPosition = x)
+      } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+        if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+      }
+
+      note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+      //File finding strategy--not driver specific
+      note("\nFile input options:")
+      opt[Unit]('r', "recursive") action { (_, options) =>
+        options.copy(recursive = true)
+      } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+      opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+        options.copy(filenamePattern = x)
+      } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+      //Drm output schema--not driver specific, drm specific
+      note("\nOutput text file schema options:")
+      opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+        options.copy(rowKeyDelim = x)
+      } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+      opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+        options.copy(columnIdStrengthDelim = x)
+      } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+      opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
+        options.copy(tupleDelim = x)
+      } text ("Separates vector tuple values in the values list (optional). Default: \",\"")
+
+      //Spark config options--not driver specific
+      note("\nSpark config options:")
+      opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
+        options.copy(sparkExecutorMem = x)
+      } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g")
+
+      note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"")
+
+      //Driver notes--driver specific
+      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n")
+
+      help("help") abbr ("h") text ("prints this usage text\n")
+
+      checkConfig { c =>
+        if (c.filterPosition == c.itemIDPosition
+            || c.filterPosition == c.rowIDPosition
+            || c.rowIDPosition == c.itemIDPosition)
+          failure("The row, item, and filter positions must be unique.") else success
+      }
+
+      //check for option consistency, probably driver specific
+      checkConfig { c =>
+        if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" +
+          " using filters they must be unique.") else success
+      }
+
+    }
+
+    //repeated code, should this be put base MahoutDriver somehow?
+    parser.parse(args, Options()) map { opts =>
+      options = opts
+      process
+    }
+
+  }
+
+  override def start(masterUrl: String = options.master,
+      appName: String = options.appName):
+    Unit = {
+
+    sparkConf.set("spark.kryo.referenceTracking", "false")
+      .set("spark.kryoserializer.buffer.mb", "200")
+      .set("spark.executor.memory", options.sparkExecutorMem)
+
+    super.start(masterUrl, appName)
+
+    val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1,
+        "rowIDPosition" -> options.rowIDPosition,
+        "columnIDPosition" -> options.itemIDPosition,
+        "filterPosition" -> options.filterPosition)
+
+    reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
+
+    if (options.filterPosition != -1 && options.filter2 != null) {
+      val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2,
+          "rowIDPosition" -> options.rowIDPosition,
+          "columnIDPosition" -> options.itemIDPosition,
+          "filterPosition" -> options.filterPosition)
+
+      reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
+    }
+
+    writeSchema = new Schema(
+        "rowKeyDelim" -> options.rowKeyDelim,
+        "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
+        "tupleDelim" -> options.tupleDelim)
+
+    writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
+
+  }
+
+  private def readIndexedDatasets: Array[IndexedDataset] = {
+
+    val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris
+
+    if (inFiles.isEmpty) {
+      Array()
+    } else {
+
+      val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles))
+
+      if (options.filterPosition != -1 && options.filter2 != null) {
+        // todo: needs to support more than one cross-similarity indicator
+        val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles))
+        Array(selfSimilarityDataset, crossSimilarityDataset1)
+      } else {
+        Array(selfSimilarityDataset)
+      }
+
+    }
+
+  }
+
+  override def process: Unit = {
+    start()
+
+    val indexedDatasets = readIndexedDatasets
+
+    // todo: allow more than one cross-similarity matrix?
+    val indicatorMatrices = {
+      if (indexedDatasets.length > 1) {
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix))
+      } else {
+        CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs)
+      }
+    }
+
+    // self similarity
+    // the next two lines write the drm using a Writer class
+    // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs)
+    // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix")
+
+    // an alternative is to create a version of IndexedDataset that knows how to write itself
+    val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
+      indexedDatasets(0).columnIDs, writeSchema)
+    selfIndicatorDataset.writeTo(options.output + "indicator-matrix")
+
+    // todo: needs to support more than one cross-similarity indicator
+    if (indexedDatasets.length > 1) {
+
+      val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
+      writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix")
+
+    }
+
+    stop
+  }
+
+  // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option
+  // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs
+  case class Options(
+      master: String = "local",
+      sparkExecutorMem: String = "2g",
+      appName: String = "ItemSimilarityJob",
+      randomSeed: Int = System.currentTimeMillis().toInt,
+      recursive: Boolean = false,
+      input: String = null,
+      output: String = null,
+      filenamePattern: String = "^part-.*",
+      maxSimilaritiesPerItem: Int = 100,
+      maxPrefs: Int = 500,
+      minPrefs: Int = 1,
+      rowIDPosition: Int = 0,
+      itemIDPosition: Int = 1,
+      filterPosition: Int = -1,
+      filter1: String = null,
+      filter2: String = null,
+      inDelim: String = "[,\t ]",
+      rowKeyDelim: String = "\t",
+      columnIdStrengthDelim: String = ":",
+      tupleDelim: String = ",")
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..afc7c1e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+  * Also define a command line parser and default options or fill in the following template:
+  * {{{
+  *   object SomeDriver extends MahoutDriver {
+  *     override def main(args: Array[String]): Unit = {
+  *       val parser = new MahoutOptionParser[Options]("Job Name") {
+  *         head("Job Name", "Spark")
+  *         note("Various CLI options")
+  *         //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends
+  *       }
+  *       parser.parse(args, Options()) map { opts =>
+  *         options = opts
+  *         process
+  *       }
+  *     }
+  *
+  *     override def process: Unit = {
+  *       start()
+  *       //don't just stand there do something
+  *       stop
+  *     }
+  *
+  *     //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option
+  *     case class Options(
+  *       appName: String = "Job Name", ...
+  *     )
+  *   }
+  * }}}
+  */
+abstract class MahoutDriver {
+  implicit var mc: DistributedContext = _
+  implicit val sparkConf = new SparkConf()
+
+  /** Creates a Spark context to run the job inside.
+    * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
+    * these must be set before the context is created.
+    * @param masterUrl Spark master URL
+    * @param appName  Name to display in Spark UI
+    * @param customJars List of paths to custom jars
+    * */
+  protected def start(masterUrl: String, appName: String, customJars:Traversable[String]) : Unit = {
+    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
+  }
+
+  protected def start(masterUrl: String, appName: String) : Unit = {
+    val customJars = Traversable.empty[String]
+    mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
+  }
+
+  /** Override (optionally) for special cleanup */
+  protected def stop: Unit = {
+    mc.close
+  }
+
+  /** This is wher you do the work, call start first, then before exiting call stop */
+  protected def process: Unit
+
+  /** Parse command line and call process */
+  def main(args: Array[String]): Unit
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..8a337f5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */
+class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) {
+  override def showUsageOnError = true
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
new file mode 100644
index 0000000..c5b7385
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read.
+  * @tparam T type of object read, usually supplied by an extending trait.
+  * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created.
+  */
+trait Reader[T]{
+  val mc: DistributedContext
+  val readSchema: Schema
+  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T
+  def readFrom(source: String): T = reader(mc, readSchema, source)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
+  * @tparam T
+  */
+trait Writer[T]{
+  val mc: DistributedContext
+  val writeSchema: Schema
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit
+  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
new file mode 100644
index 0000000..46e1540
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import scala.collection.mutable
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for HashMap[String, Any]
+  *
+  * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+  */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+  // note: this require a mutable HashMap, do we care?
+  this ++= params
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
new file mode 100644
index 0000000..119f8d3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math.RandomAccessSparseVector
+import com.google.common.collect.{BiMap, HashBiMap}
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.sparkbindings._
+
+
+/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
+  */
+trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+  /** Read in text delimited tuples from all URIs in this comma delimited source String.
+    *
+    * @param mc context for the Spark job
+    * @param readSchema describes the delimiters and positions of values in the text delimited file.
+    * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+    * @return
+    */
+  protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = {
+    try {
+      val delimiter = readSchema("delim").asInstanceOf[String]
+      val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
+      val columnIDPosition = readSchema("columnIDPosition").asInstanceOf[Int]
+      val filterPosition = readSchema("filterPosition").asInstanceOf[Int]
+      val filterBy = readSchema("filter").asInstanceOf[String]
+      // instance vars must be put into locally scoped vals when used in closures that are executed but Spark
+
+      assert(!source.isEmpty, {
+        println(this.getClass.toString + ": has no files to read")
+        throw new IllegalArgumentException
+      })
+
+      var columns = mc.textFile(source).map { line => line.split(delimiter) }
+
+      // -1 means no filter in the input text, take them all
+      if(filterPosition != -1) {
+        // get the rows that have a column matching the filter
+        columns = columns.filter { tokens => tokens(filterPosition) == filterBy }
+      }
+
+      // get row and column IDs
+      val m = columns.collect
+      val interactions = columns.map { tokens =>
+        tokens(rowIDPosition) -> tokens(columnIDPosition)
+      }
+
+      interactions.cache()
+
+      // create separate collections of rowID and columnID tokens
+      val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
+      val columnIDs = interactions.map { case (_, columnID) => columnID }.distinct().collect()
+
+      val numRows = rowIDs.size
+      val numColumns = columnIDs.size
+
+      // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
+      // broadcast them for access in distributed processes, so they are not recalculated in every task.
+      val rowIDDictionary = asOrderedDictionary(rowIDs)
+      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
+      val columnIDDictionary = asOrderedDictionary(columnIDs)
+      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+
+      val indexedInteractions =
+        interactions.map { case (rowID, columnID) =>
+          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+          val columnIndex = columnIDDictionary_bcast.value.get(columnID).get
+
+          rowIndex -> columnIndex
+        }
+        // group by IDs to form row vectors
+        .groupByKey().map { case (rowIndex, columnIndexes) =>
+          val row = new RandomAccessSparseVector(numColumns)
+          for (columnIndex <- columnIndexes) {
+            row.setQuick(columnIndex, 1.0)
+          }
+          rowIndex -> row
+        }
+        .asInstanceOf[DrmRdd[Int]]
+
+      // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
+      val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+
+      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+
+    } catch {
+      case cce: ClassCastException => {
+        println(this.getClass.toString + ": Schema has illegal values"); throw cce
+      }
+    }
+  }
+
+  // this creates a BiMap from an ID collection. The ID points to an ordinal int
+  // which is used internal to Mahout as the row or column ID
+  // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
+  private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = {
+    var dictionary: BiMap[String, Int] = HashBiMap.create()
+    var index = 0
+    for (entry <- entries) {
+      dictionary.forcePut(entry, index)
+      index += 1
+    }
+    dictionary
+  }
+}
+
+trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+  /** Read in text delimited tuples from all URIs in this comma delimited source String.
+    *
+    * @param mc context for the Spark job
+    * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
+    * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
+    */
+  protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = {
+    try {
+      val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
+      val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
+      val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String]
+      //instance vars must be put into locally scoped vals when put into closures that are
+      //executed but Spark
+      assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException })
+      assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException})
+      val matrix = indexedDataset.matrix
+      val rowIDDictionary = indexedDataset.rowIDs
+      val columnIDDictionary = indexedDataset.columnIDs
+
+      matrix.rdd.map { case (rowID, itemVector) =>
+
+        // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens
+        // first get the external rowID token
+        var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+
+        // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+        for (item <- itemVector.nonZeroes()) {
+          line += columnIDDictionary.inverse.get(item.index) + columnIdStrengthDelim + item.get + tupleDelim
+        }
+        // drop the last delimiter, not needed to end the line
+        line.dropRight(1)
+      }
+      .saveAsTextFile(dest)
+
+    }catch{
+      case cce: ClassCastException => {println(this.getClass.toString+": Schema has illegal values"); throw cce}
+    }
+  }
+}
+
+/** A combined trait that reads and writes */
+trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter
+
+/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
+  * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
+  * @param mc Spark context for reading files
+  * @note The source is supplied by Reader#readFrom .
+  * */
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
+
+/** Writes  text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
+  * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+  * @param mc Spark context for reading files
+  * @note the destination is supplied by Writer#writeTo trait method
+  * */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
+
+/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
+  * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
+  * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+  * @param mc Spark context for reading the files, may be implicitly defined.
+  * */
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
+
+/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
+  * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
+  * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
+  * are probably short lived in terms of lines of code so complexity may be moot.
+  * @param matrix the data
+  * @param rowIDs bi-directional dictionary for rows of external IDs to internal ordinal Mahout IDs.
+  * @param columnIDs bi-directional dictionary for columns of external IDs to internal ordinal Mahout IDs.
+  * @param writeSchema contains params for the schema/format or the written text delimited file.
+  * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
+  * */
+class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int],
+                                           val writeSchema: Schema)(implicit val mc: DistributedContext)
+  extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
+
+  def writeTo(dest: String): Unit = {
+    writeTo(this, dest)
+  }
+}
+
+/**
+ * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
+ * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
+ * {{{
+ *   val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source))
+ * }}}
+ */
+
+object IndexedDatasetTextDelimitedWriteable {
+  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
+  def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix,  id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index b0042c9..79c7585 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -18,6 +18,8 @@
 package org.apache.mahout.sparkbindings.io
 
 import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.serializers.JavaSerializer
+import com.google.common.collect.HashBiMap
 import org.apache.mahout.math._
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.mahout.sparkbindings._
@@ -33,7 +35,7 @@ class MahoutKryoRegistrator extends KryoRegistrator {
     kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable])
     kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable])
-
+    kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 2db5f50..e46dad5 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,13 +17,11 @@
 
 package org.apache.mahout.cf
 
-import org.scalatest.FunSuite
-import org.apache.mahout.test.MahoutSuite
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.MatrixOps
 import org.apache.mahout.math.drm._
-import org.apache.mahout.math._
+import org.apache.mahout.math.scalabindings.{MatrixOps, _}
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.test.MahoutSuite
+import org.scalatest.FunSuite
 
 /* values 
 A =
@@ -41,32 +39,42 @@ B =
 
 class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
 
+  // correct cooccurrence with LLR
+  final val matrixLLRCoocAtAControl = dense(
+    (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
+    (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
+    (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
+    (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
+    (0.0,                0.0,                0.0,                     0.0,                0.0))
+
+  // correct cross-cooccurrence with LLR
+  final val matrixLLRCoocBtAControl = dense(
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.40461878191490940),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.0,                0.0,                0.0,                0.0,                0.8181382096075936))
+
+
+
   test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
-    val a = dense((1, 1, 0, 0, 0), (0, 0, 1, 1, 0), (0, 0, 0, 0, 1), (1, 0, 0, 1, 0))
-    val b = dense((1, 1, 1, 1, 0), (1, 1, 1, 1, 0), (0, 0, 1, 0, 1), (1, 1, 0, 1, 0))
+    val a = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val b = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 0),
+        (0, 0, 1, 0, 1),
+        (1, 1, 0, 1, 0))
+
     val drmA = drmParallelize(m = a, numPartitions = 2)
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
-    // correct cooccurrence with LLR
-    val matrixLLRCoocAtAControl = dense(
-      (0.0, 1.7260924347106847, 0, 0, 0),
-      (1.7260924347106847, 0, 0, 0, 0),
-      (0, 0, 0, 1.7260924347106847, 0),
-      (0, 0, 1.7260924347106847, 0, 0),
-      (0, 0, 0, 0, 0)
-    )
-
-    // correct cross-cooccurrence with LLR
-    val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (0, 0, 0, 0, 4.498681156950466)
-    )
-
     //self similarity
-    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+    val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
     val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
     var n = (new MatrixOps(m = diffMatrix)).norm
@@ -77,32 +85,25 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
     val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
     n = (new MatrixOps(m = diff2Matrix)).norm
     n should be < 1E-10
+
   }
 
   test("cooccurrence [A'A], [B'A] double data using LLR") {
-    val a = dense((100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), (0.0D, 0.0D, 10.0D, 1.0D, 0.0D), (0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), (1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
-    val b = dense((10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), (10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+    val a = dense(
+        (100000.0D, 1.0D, 0.0D,  0.0D,  0.0D),
+        (0.0D,      0.0D, 10.0D, 1.0D,  0.0D),
+        (0.0D,      0.0D, 0.0D,  0.0D,  1000.0D),
+        (1.0D,      0.0D, 0.0D,  10.0D, 0.0D))
+
+    val b = dense(
+        (10000.0D, 100.0D, 1000.0D,     1.0D,      0.0D),
+        (10.0D,    1.0D,   10000000.0D, 10.0D,     0.0D),
+        (0.0D,     0.0D,   1000.0D,     0.0D,      100.0D),
+        (100.0D,   1.0D,   0.0D,        100000.0D, 0.0D))
+
     val drmA = drmParallelize(m = a, numPartitions = 2)
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
-    // correct cooccurrence with LLR
-    val matrixLLRCoocAtAControl = dense(
-      (0.0, 1.7260924347106847, 0, 0, 0),
-      (1.7260924347106847, 0, 0, 0, 0),
-      (0, 0, 0, 1.7260924347106847, 0),
-      (0, 0, 1.7260924347106847, 0, 0),
-      (0, 0, 0, 0, 0)
-    )
-
-    // correct cross-cooccurrence with LLR
-    val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (0, 0, 0, 0, 4.498681156950466)
-    )
-
     //self similarity
     val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
     val matrixSelfCooc = drmCooc(0).checkpoint().collect
@@ -118,30 +119,22 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
   }
 
   test("cooccurrence [A'A], [B'A] integer data using LLR") {
-    val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
-    val b = dense((100, 1000, -10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, -100), (10, 100, 0, 1000, 0))
+    val a = dense(
+        (1000,  10, 0,      0,    0),
+        (0,     0,  -10000, 10,   0),
+        (0,     0,  0,      0,    100),
+        (10000, 0,  0,      1000, 0))
+
+    val b = dense(
+        (100,   1000, -10000, 10000, 0),
+        (10000, 1000, 100,    10,    0),
+        (0,     0,    10,     0,     -100),
+        (10,    100,  0,      1000,  0))
+
     val drmA = drmParallelize(m = a, numPartitions = 2)
     val drmB = drmParallelize(m = b, numPartitions = 2)
 
-    // correct cooccurrence with LLR
-    val matrixLLRCoocAtAControl = dense(
-      (0.0, 1.7260924347106847, 0, 0, 0),
-      (1.7260924347106847, 0, 0, 0, 0),
-      (0, 0, 0, 1.7260924347106847, 0),
-      (0, 0, 1.7260924347106847, 0, 0),
-      (0, 0, 0, 0, 0)
-    )
-
-    // correct cross-cooccurrence with LLR
-    val matrixLLRCoocBtAControl = dense(
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
-      (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
-      (0, 0, 0, 0, 4.498681156950466)
-    )
-
-    //self similarity
+   //self similarity
     val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
     //var cp = drmSelfCooc(0).checkpoint()
     //cp.writeDRM("/tmp/cooc-spark/")//to get values written
@@ -158,22 +151,69 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
   }
 
   test("LLR calc") {
-    val numInteractionsWithAandB = 10L
-    val numInteractionsWithA = 100L
-    val numInteractionsWithB = 200L
-    val numInteractions = 10000l
-
-    val llr = CooccurrenceAnalysis.loglikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
-
-    assert(llr == 17.19462327013025)
+    val A = dense(
+        (1, 1, 0, 0, 0),
+        (0, 0, 1, 1, 0),
+        (0, 0, 0, 0, 1),
+        (1, 0, 0, 1, 0))
+
+    val AtA = A.transpose().times(A)
+
+    /* AtA is:
+      0  =>	{0:2.0,1:1.0,3:1.0}
+      1  =>	{0:1.0,1:1.0}
+      2  =>	{2:1.0,3:1.0}
+      3  =>	{0:1.0,2:1.0,3:2.0}
+      4  =>	{4:1.0}
+
+          val AtAd = dense(
+         (2, 1, 0, 1, 0),
+         (1, 1, 0, 0, 0),
+         (0, 0, 1, 1, 0),
+         (1, 0, 1, 2, 0),
+         (0, 0, 0, 0, 1))
+
+         val AtAdNoSelfCooc = dense(
+         (0, 1, 0, 1, 0),
+         (1, 0, 0, 0, 0),
+         (0, 0, 0, 1, 0),
+         (1, 0, 1, 0, 0),
+         (0, 0, 0, 0, 0))
+
+        for (MatrixSlice row : cooccurrence) {
+            for (Vector.Element element : row.vector().nonZeroes()) {
+                long k11 = (long) element.get();// = 1
+                long k12 = (long) (rowSums.get(row.index()) - k11);// = 0
+                long k21 = (long) (colSums.get(element.index()) - k11);// = 1
+                long k22 = (long) (total - k11 - k12 - k21);// = 2
+                double score = LogLikelihood.rootLogLikelihoodRatio(k11, k12, k21, k22);
+                element.set(score);
+            }
+        }
+
+        for some reason the hadoop version returns the following
+        return 1.0 - 1.0 / (1.0 + logLikelihood);
+        so not a pure llr or root llr
+
+    */
+
+    //item (1,0)
+    val numInteractionsWithAandB = 1L
+    val numInteractionsWithA = 1L
+    val numInteractionsWithB = 2L
+    val numInteractions = 6l
+
+    val llr = CooccurrenceAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+    assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
   }
 
   test("downsampling by number per row") {
-    val a = dense((1, 1, 1, 1, 0),
-      (1, 1, 1, 1, 1),
-      (0, 0, 0, 0, 1),
-      (1, 1, 0, 1, 0)
-    )
+    val a = dense(
+        (1, 1, 1, 1, 0),
+        (1, 1, 1, 1, 1),
+        (0, 0, 0, 0, 1),
+        (1, 1, 0, 1, 0))
     val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
 
     val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
new file mode 100644
index 0000000..f649d7b
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.scalatest.FunSuite
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.test.MahoutSuite
+
+class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLocalContext  {
+
+/*
+  // correct self-cooccurrence with LLR
+  final val matrixLLRCoocAtAControl = dense(
+    (0.0,                0.6331745808516107, 0.0,                     0.0,                0.0),
+    (0.6331745808516107, 0.0,                0.0,                     0.0,                0.0),
+    (0.0,                0.0,                0.0,                     0.6331745808516107, 0.0),
+    (0.0,                0.0,                0.6331745808516107,      0.0,                0.0),
+    (0.0,                0.0,                0.0,                     0.0,                0.0))
+
+  // correct cross-cooccurrence with LLR
+  final val matrixLLRCoocBtAControl = dense(
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.40461878191490940),
+    (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+    (0.0,                0.0,                0.0,                0.0,                0.8181382096075936))
+*/
+
+  final val SelfSimilairtyTSV = Set(
+      "galaxy\tnexus:0.6331745808516107",
+      "ipad\tiphone:0.6331745808516107",
+      "nexus\tgalaxy:0.6331745808516107",
+      "iphone\tipad:0.6331745808516107",
+      "surface")
+  final val CrossSimilarityTSV = Set(
+      "galaxy\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107",
+      "surface\tsurface:0.8181382096075936",
+      "nexus\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,surface:0.4046187819149094,galaxy:0.6331745808516107",
+      "ipad\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107",
+      "iphone\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107")
+
+  final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
+
+  /*
+    //Clustered Spark and HDFS, not a good everyday build test
+    ItemSimilarityDriver.main(Array(
+      "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+      "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+      "--master", "spark://occam4:7077",
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"
+    ))
+*/
+  // local multi-threaded Spark with HDFS using large dataset
+  // not a good build test.
+  /*    ItemSimilarityDriver.main(Array(
+      "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
+      "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+      "--master", "local[4]",
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"
+    ))
+  */
+
+  test ("ItemSimilarityDriver, non-full-spec CSV"){
+
+    val InFile = TmpDir + "in-file.csv/" //using part files, not singel file
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+        "u1,purchase,iphone",
+        "u1,purchase,ipad",
+        "u2,purchase,nexus",
+        "u2,purchase,galaxy",
+        "u3,purchase,surface",
+        "u4,purchase,iphone",
+        "u4,purchase,galaxy",
+        "u1,view,iphone",
+        "u1,view,ipad",
+        "u1,view,nexus",
+        "u1,view,galaxy",
+        "u2,view,iphone",
+        "u2,view,ipad",
+        "u2,view,nexus",
+        "u2,view,galaxy",
+        "u3,view,surface",
+        "u3,view,nexus",
+        "u4,view,iphone",
+        "u4,view,ipad",
+        "u4,view,galaxy")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == SelfSimilairtyTSV)
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+    assert (crossIndicatorLines == CrossSimilarityTSV)
+  }
+
+
+
+  test ("ItemSimilarityDriver TSV "){
+
+    val InFile = TmpDir + "in-file.tsv/"
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone",
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "[,\t]",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == SelfSimilairtyTSV)
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+    assert (crossIndicatorLines == CrossSimilarityTSV)
+
+  }
+
+  test ("ItemSimilarityDriver log-ish files"){
+
+    val InFile = TmpDir + "in-file.log/"
+    val OutPath = TmpDir + "indicator-matrices/"
+
+    val lines = Array(
+      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
+      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
+      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
+      "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
+      "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
+
+    // this will create multiple part-xxxxx files in the InFile dir but other tests will
+    // take account of one actual file
+    val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InFile,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "\t",
+      "--itemIDPosition", "4",
+      "--rowIDPosition", "1",
+      "--filterPosition", "2"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == SelfSimilairtyTSV)
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+    assert (crossIndicatorLines == CrossSimilarityTSV)
+
+  }
+
+  test ("ItemSimilarityDriver legacy supported file format"){
+
+    val InDir = TmpDir + "in-dir/"
+    val InFilename = "in-file.tsv"
+    val InPath = InDir + InFilename
+
+    val OutPath = TmpDir + "indicator-matrices"
+
+    val lines = Array(
+        "0,0,1",
+        "0,1,1",
+        "1,2,1",
+        "1,3,1",
+        "2,4,1",
+        "3,0,1",
+        "3,3,1")
+
+    val Answer = Set(
+      "0\t1:0.6331745808516107",
+      "3\t2:0.6331745808516107",
+      "1\t0:0.6331745808516107",
+      "4",
+      "2\t3:0.6331745808516107")
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
+
+    // to change from using part files to a single .tsv file we'll need to use HDFS
+    val fs = FileSystem.get(new Configuration())
+    //rename part-00000 to something.tsv
+    fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InPath,
+      "--output", OutPath,
+      "--master", masterUrl))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == Answer)
+
+  }
+
+  test("ItemSimilarityDriver recursive file discovery using filename patterns"){
+    //directory structure using the following
+    // tmp/data/m1.tsv
+    // tmp/data/more-data/another-dir/m2.tsv
+    val M1Lines = Array(
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone")
+
+    val M2Lines = Array(
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy")
+
+    val InFilenameM1 = "m1.tsv"
+    val InDirM1 = TmpDir + "data/"
+    val InPathM1 = InDirM1 + InFilenameM1
+    val InFilenameM2 = "m2.tsv"
+    val InDirM2 = TmpDir + "data/more-data/another-dir/"
+    val InPathM2 = InDirM2 + InFilenameM2
+
+    val InPathStart = TmpDir + "data/"
+    val OutPath = TmpDir + "indicator-matrices"
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM1)
+
+    // to change from using part files to a single .tsv file we'll need to use HDFS
+    val fs = FileSystem.get(new Configuration())
+    //rename part-00000 to something.tsv
+    fs.rename(new Path(InDirM1 + "part-00000"), new Path(InPathM1))
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(M2Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM2)
+
+    // to change from using part files to a single .tsv file we'll need to use HDFS
+    //rename part-00000 to tmp/some-location/something.tsv
+    fs.rename(new Path(InDirM2 + "part-00000"), new Path(InPathM2))
+
+    // local multi-threaded Spark with default FS, suitable for build tests but need better location for data
+
+    afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+    ItemSimilarityDriver.main(Array(
+      "--input", InPathStart,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", "\t",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1",
+      "--filenamePattern", "m..tsv",
+      "--recursive"))
+
+    beforeEach()// restart the test context to read the output of the driver
+    val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == SelfSimilairtyTSV)
+    val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toSet[String]
+    assert (crossIndicatorLines == CrossSimilarityTSV)
+  }
+
+  override def afterAll = {
+    // remove TmpDir
+    val fs = FileSystem.get(new Configuration())
+    fs.delete(new Path(TmpDir), true) // delete recursively
+
+    super.afterAll
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index c48cfc7..fb97f68 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -10,11 +10,13 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
   this: Suite =>
 
   protected implicit var mahoutCtx: DistributedContext = _
+  protected var masterUrl = null.asInstanceOf[String]
 
   override protected def beforeEach() {
     super.beforeEach()
 
-    mahoutCtx = mahoutSparkContext(masterUrl = "local[2]",
+    masterUrl = "local[2]"
+    mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
       appName = "MahoutLocalContext",
       // Do not run MAHOUT_HOME jars in unit tests.
       addMahoutJars = false,