You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/07/04 00:59:11 UTC
git commit: MAHOUT-1561, MAHOUT-1568,
MAHOUT-1569 text-delimited Spark readers and writers with drivers and
a CLI for 'spark-itemsimilarity' closes apache/mahout#22
Repository: mahout
Updated Branches:
refs/heads/master 37b852539 -> 2b65475c3
MAHOUT-1561, MAHOUT-1568, MAHOUT-1569 text-delimited Spark readers and writers with drivers and a CLI for 'spark-itemsimilarity' closes apache/mahout#22
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2b65475c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2b65475c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2b65475c
Branch: refs/heads/master
Commit: 2b65475c3ab682ebd47cffdc6b502698799cd2c8
Parents: 37b8525
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Jul 3 15:56:37 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Jul 3 15:56:37 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
bin/mahout | 20 +-
spark/pom.xml | 36 ++
spark/src/main/assembly/job.xml | 46 +++
.../apache/mahout/cf/CooccurrenceAnalysis.scala | 11 +-
.../apache/mahout/drivers/FileSysUtils.scala | 71 ++++
.../apache/mahout/drivers/IndexedDataset.scala | 56 +++
.../mahout/drivers/ItemSimilarityDriver.scala | 314 +++++++++++++++
.../apache/mahout/drivers/MahoutDriver.scala | 83 ++++
.../mahout/drivers/MahoutOptionParser.scala | 24 ++
.../apache/mahout/drivers/ReaderWriter.scala | 41 ++
.../org/apache/mahout/drivers/Schema.scala | 30 ++
.../drivers/TextDelimitedReaderWriter.scala | 222 +++++++++++
.../io/MahoutKryoRegistrator.scala | 4 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 200 ++++++----
.../drivers/ItemSimilarityDriverSuite.scala | 382 +++++++++++++++++++
.../sparkbindings/test/MahoutLocalContext.scala | 4 +-
17 files changed, 1454 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c9b6a0d..23ad49e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1541, MAHOUT-1568, MAHOUT-1569: Created text-delimited file I/O traits and classes on spark, a MahoutDriver for a CLI and a ItemSimilairtyDriver using the CLI
+
MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov)
MAHOUT-1580: Optimize getNumNonZeroElements() (ssc)
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index b0a8f6e..5f54181 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -7,7 +7,7 @@
# MAHOUT_JAVA_HOME The java implementation to use. Overrides JAVA_HOME.
#
# MAHOUT_HEAPSIZE The maximum amount of heap to use, in MB.
-# Default is 1000.
+# Default is 4000.
#
# HADOOP_CONF_DIR The location of a hadoop config directory
#
@@ -84,6 +84,9 @@ if [ "$1" == "spark-shell" ]; then
SPARK=1
fi
+if [ "$1" == "spark-itemsimilarity" ]; then
+ SPARK=1
+fi
if [ "$MAHOUT_CORE" != "" ]; then
IS_CORE=1
@@ -105,7 +108,7 @@ if [ "$JAVA_HOME" = "" ]; then
fi
JAVA=$JAVA_HOME/bin/java
-JAVA_HEAP_MAX=-Xmx3g
+JAVA_HEAP_MAX=-Xmx4g
# check envvars which might override default args
if [ "$MAHOUT_HEAPSIZE" != "" ]; then
@@ -156,14 +159,14 @@ then
CLASSPATH=${CLASSPATH}:$f;
done
fi
-
+
# add scala dev target
- for f in $MAHOUT_HOME/math-scala/target/mahout-math-scala-*.jar ; do
+ for f in $MAHOUT_HOME/math-scala/target/mahout-math-scala-*.jar ; do
CLASSPATH=${CLASSPATH}:$f;
done
- # add spark-shell -- if we requested shell
+ # add spark-shell -- if we requested shell or other spark CLI driver
if [ "$SPARK" == "1" ]; then
for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do
@@ -183,7 +186,7 @@ then
SPARK_CLASSPATH=$("${SPARK_CP_BIN}" 2>/dev/null)
CLASSPATH="${CLASSPATH}:${SPARK_CLASSPATH}"
else
- echo "Cannot find Spark classpath."
+ echo "Cannot find Spark classpath. Is 'SPARK_HOME' set?"
exit -1
fi
@@ -228,6 +231,11 @@ case "$1" in
"$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@
stty sane; stty $save_stty
;;
+ # Spark CLI drivers go here
+ (spark-itemsimilarity)
+ shift
+ "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.ItemSimilarityDriver" "$@"
+ ;;
(*)
# default log directory & file
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 5dc566f..03ea2a0 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -83,6 +83,27 @@
</executions>
</plugin>
+ <!-- create core job dependencies jar -->
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>job</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/job.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- create test jar so other modules can reuse the math test utility classes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -145,6 +166,7 @@
<reportsDirectory>${project.build.directory}/scalatest-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
+ <argLine>-Xmx1024m</argLine>
</configuration>
<executions>
<execution>
@@ -296,6 +318,20 @@
<!-- 3rd-party -->
+ <dependency>
+ <groupId>com.github.scopt</groupId>
+ <artifactId>scopt_2.10</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+
+ <!-- spark stuff -->
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.major}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<!-- scala stuff -->
<dependency>
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml
new file mode 100644
index 0000000..0c41f3d
--- /dev/null
+++ b/spark/src/main/assembly/job.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+ http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>job</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <unpack>true</unpack>
+ <unpackOptions>
+ <!-- MAHOUT-1126 -->
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ </excludes>
+ </unpackOptions>
+ <scope>runtime</scope>
+ <outputDirectory>/</outputDirectory>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <excludes>
+ <exclude>org.apache.hadoop:hadoop-core</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
index ee44f90..b01332c 100644
--- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
+++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
@@ -22,7 +22,6 @@ import scalabindings._
import RLikeOps._
import drm._
import RLikeDrmOps._
-import org.apache.mahout.sparkbindings._
import scala.collection.JavaConversions._
import org.apache.mahout.math.stats.LogLikelihood
import collection._
@@ -96,7 +95,7 @@ object CooccurrenceAnalysis extends Serializable {
* Compute loglikelihood ratio
* see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
**/
- def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
+ def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
numInteractionsWithAandB: Long, numInteractions: Long) = {
val k11 = numInteractionsWithAandB
@@ -105,6 +104,7 @@ object CooccurrenceAnalysis extends Serializable {
val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+
}
def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
@@ -131,9 +131,12 @@ object CooccurrenceAnalysis extends Serializable {
// exclude co-occurrences of the item with itself
if (crossCooccurrence || thingB != thingA) {
// Compute loglikelihood ratio
- val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
+ val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
cooccurrences.toLong, numUsers)
- val candidate = thingA -> llrRatio
+
+ // matches hadoop code and maps values to range (0..1)
+ val tLLR = 1.0 - (1.0 / (1.0 + llr))
+ val candidate = thingA -> tLLR
// Enqueue item with score, if belonging to the top-k
if (topItemsPerThing.size < maxInterestingItemsPerThing) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
new file mode 100644
index 0000000..654f116
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
+
+/**
+ * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, only HDFS is currently
+ * supported. The pathURI may be a list of comma delimited URIs like those supported
+ * by Spark
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively
+ */
+
+case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: Boolean = false) {
+
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+
+ /** returns a string of comma delimited URIs matching the filePattern */
+ def uris :String = {
+ if(recursive){
+ val pathURIs = pathURI.split(",")
+ var files = ""
+ for ( uri <- pathURIs ){
+ files = findFiles(uri, filePattern, files)
+ }
+ if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+ files
+ }else{
+ pathURI
+ }
+ }
+
+ /** find matching files in the dir, recursively call self when another directory is found */
+ def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
+ val fileStatuses: Array[FileStatus] = fs.listStatus (new Path(dir))
+ var f :String = files
+ for (fileStatus <- fileStatuses ){
+ if (fileStatus.getPath().getName().matches(filePattern)
+ && !fileStatus.isDir){// found a file
+ if (fileStatus.getLen() != 0) {
+ // file is not empty
+ f = f + fileStatus.getPath.toUri.toString + ","
+ }
+ }else if (fileStatus.isDir){
+ f = findFiles(fileStatus.getPath.toString, filePattern, f)
+ }
+ }
+ f
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
new file mode 100644
index 0000000..0d8c160
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.CheckpointedDrm
+
+/**
+ * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
+ * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+ * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
+ * used internal to Mahout Core code.
+ *
+ * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
+ * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
+ * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
+ *
+ * @param matrix DrmLike[Int], representing the distributed matrix storing the actual data.
+ * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
+ * and from the ordinal Mahout Int ID. This one holds row labels
+ * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
+ * ID to and from the ordinal Mahout Int ID. This one holds column labels
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+ * to be not created when not needed.
+ */
+
+case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
+}
+
+/**
+ * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for
+ * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
+ * {{{
+ * val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source))
+ * }}}
+ */
+
+object IndexedDataset {
+ /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
+ def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix, id2.rowIDs, id2.columnIDs)
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
new file mode 100644
index 0000000..77005f1
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -0,0 +1,314 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.cf.CooccurrenceAnalysis
+
+/**
+ * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Reads text lines
+ * that contain (row id, column id, ...). The IDs are user specified strings which will be
+ * preserved in the
+ * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * will be used to calculate row-wise self-similarity, or when using filters, will generate two
+ * matrices and calculate both the self similarity of the primary matrix and the row-wise
+ * similarity of the primary
+ * to the secondary. Returns one or two directories of text files formatted as specified in
+ * the options.
+ * The options allow flexible control of the input schema, file discovery, output schema, and control of
+ * algorithm parameters.
+ * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple
+ * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space,
+ * you can specify only the input and output file and directory--all else will default to the correct values.
+ * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check
+ * the --sparkExecutorMemory option.
+ */
+object ItemSimilarityDriver extends MahoutDriver {
+ //todo: Should also take two input streams and do cross similarity with no filter required.
+ // required for examples
+
+ private var options: Options = _
+ private var reader1: TextDelimitedIndexedDatasetReader = _
+ private var reader2: TextDelimitedIndexedDatasetReader = _
+ private var writer: TextDelimitedIndexedDatasetWriter = _
+ private var writeSchema: Schema = _
+
+ /**
+ * @param args Command line args, if empty a help message is printed.
+ */
+ override def main(args: Array[String]): Unit = {
+ val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") {
+ head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT")
+
+ //Input output options, non-driver specific
+ note("Input, output options")
+ opt[String]('i', "input") required() action { (x, options) =>
+ options.copy(input = x)
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+ opt[String]('o', "output") required() action { (x, options) =>
+ if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally?
+ options.copy(output = x)
+ else
+ options.copy(output = x + "/")
+ } text ("Path for output, any local or HDFS supported URI (required).")
+
+ //Algorithm control options--driver specific
+ note("\nAlgorithm control options:")
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options.copy(master = x)
+ }
+
+ opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
+ options.copy(maxPrefs = x)
+ } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x =>
+ if (x > 0) success else failure("Option --maxPrefs must be > 0")
+ }
+
+/** not implemented in CooccurrenceAnalysis.cooccurrence
+ opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
+ options.copy(minPrefs = x)
+ } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x =>
+ if (x > 0) success else failure("Option --minPrefs must be > 0")
+ }
+*/
+
+ opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
+ options.copy(maxSimilaritiesPerItem = x)
+ } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x =>
+ if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0")
+ }
+
+ opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+ options.copy(randomSeed = x)
+ } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x =>
+ if (x > 0) success else failure("Option --randomSeed must be > 0")
+ }
+
+ //Input text file schema--not driver specific but input data specific, tuples input,
+ // not drms
+ note("\nInput text file schema options:")
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+ options.copy(inDelim = x)
+ }
+
+ opt[String]("filter1") abbr ("f1") action { (x, options) =>
+ options.copy(filter1 = x)
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+ opt[String]("filter2") abbr ("f2") action { (x, options) =>
+ options.copy(filter2 = x)
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.")
+
+ opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+ options.copy(rowIDPosition = x)
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ }
+
+ opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+ options.copy(itemIDPosition = x)
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ }
+
+ opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+ options.copy(filterPosition = x)
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+ }
+
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+ //File finding strategy--not driver specific
+ note("\nFile input options:")
+ opt[Unit]('r', "recursive") action { (_, options) =>
+ options.copy(recursive = true)
+ } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+ opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+ options.copy(filenamePattern = x)
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+ //Drm output schema--not driver specific, drm specific
+ note("\nOutput text file schema options:")
+ opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+ options.copy(rowKeyDelim = x)
+ } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+ opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+ options.copy(columnIdStrengthDelim = x)
+ } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+ opt[String]("tupleDelim") abbr ("td") action { (x, options) =>
+ options.copy(tupleDelim = x)
+ } text ("Separates vector tuple values in the values list (optional). Default: \",\"")
+
+ //Spark config options--not driver specific
+ note("\nSpark config options:")
+ opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
+ options.copy(sparkExecutorMem = x)
+ } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g")
+
+ note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"")
+
+ //Driver notes--driver specific
+ note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n")
+
+ help("help") abbr ("h") text ("prints this usage text\n")
+
+ checkConfig { c =>
+ if (c.filterPosition == c.itemIDPosition
+ || c.filterPosition == c.rowIDPosition
+ || c.rowIDPosition == c.itemIDPosition)
+ failure("The row, item, and filter positions must be unique.") else success
+ }
+
+ //check for option consistency, probably driver specific
+ checkConfig { c =>
+ if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" +
+ " using filters they must be unique.") else success
+ }
+
+ }
+
+ //repeated code, should this be put base MahoutDriver somehow?
+ parser.parse(args, Options()) map { opts =>
+ options = opts
+ process
+ }
+
+ }
+
+ override def start(masterUrl: String = options.master,
+ appName: String = options.appName):
+ Unit = {
+
+ sparkConf.set("spark.kryo.referenceTracking", "false")
+ .set("spark.kryoserializer.buffer.mb", "200")
+ .set("spark.executor.memory", options.sparkExecutorMem)
+
+ super.start(masterUrl, appName)
+
+ val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1,
+ "rowIDPosition" -> options.rowIDPosition,
+ "columnIDPosition" -> options.itemIDPosition,
+ "filterPosition" -> options.filterPosition)
+
+ reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
+
+ if (options.filterPosition != -1 && options.filter2 != null) {
+ val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2,
+ "rowIDPosition" -> options.rowIDPosition,
+ "columnIDPosition" -> options.itemIDPosition,
+ "filterPosition" -> options.filterPosition)
+
+ reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
+ }
+
+ writeSchema = new Schema(
+ "rowKeyDelim" -> options.rowKeyDelim,
+ "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
+ "tupleDelim" -> options.tupleDelim)
+
+ writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
+
+ }
+
+ private def readIndexedDatasets: Array[IndexedDataset] = {
+
+ val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris
+
+ if (inFiles.isEmpty) {
+ Array()
+ } else {
+
+ val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles))
+
+ if (options.filterPosition != -1 && options.filter2 != null) {
+ // todo: needs to support more than one cross-similarity indicator
+ val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles))
+ Array(selfSimilarityDataset, crossSimilarityDataset1)
+ } else {
+ Array(selfSimilarityDataset)
+ }
+
+ }
+
+ }
+
+ override def process: Unit = {
+ start()
+
+ val indexedDatasets = readIndexedDatasets
+
+ // todo: allow more than one cross-similarity matrix?
+ val indicatorMatrices = {
+ if (indexedDatasets.length > 1) {
+ CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix))
+ } else {
+ CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs)
+ }
+ }
+
+ // self similarity
+ // the next two lines write the drm using a Writer class
+ // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs)
+ // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix")
+
+ // an alternative is to create a version of IndexedDataset that knows how to write itself
+ val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
+ indexedDatasets(0).columnIDs, writeSchema)
+ selfIndicatorDataset.writeTo(options.output + "indicator-matrix")
+
+ // todo: needs to support more than one cross-similarity indicator
+ if (indexedDatasets.length > 1) {
+
+ val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
+ writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix")
+
+ }
+
+ stop
+ }
+
+ // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option
+ // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs
+ case class Options(
+ master: String = "local",
+ sparkExecutorMem: String = "2g",
+ appName: String = "ItemSimilarityJob",
+ randomSeed: Int = System.currentTimeMillis().toInt,
+ recursive: Boolean = false,
+ input: String = null,
+ output: String = null,
+ filenamePattern: String = "^part-.*",
+ maxSimilaritiesPerItem: Int = 100,
+ maxPrefs: Int = 500,
+ minPrefs: Int = 1,
+ rowIDPosition: Int = 0,
+ itemIDPosition: Int = 1,
+ filterPosition: Int = -1,
+ filter1: String = null,
+ filter2: String = null,
+ inDelim: String = "[,\t ]",
+ rowKeyDelim: String = "\t",
+ columnIdStrengthDelim: String = ":",
+ tupleDelim: String = ",")
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..afc7c1e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a command line parser and default options or fill in the following template:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ * override def main(args: Array[String]): Unit = {
+ * val parser = new MahoutOptionParser[Options]("Job Name") {
+ * head("Job Name", "Spark")
+ * note("Various CLI options")
+ * //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends
+ * }
+ * parser.parse(args, Options()) map { opts =>
+ * options = opts
+ * process
+ * }
+ * }
+ *
+ * override def process: Unit = {
+ * start()
+ * //don't just stand there do something
+ * stop
+ * }
+ *
+ * //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option
+ * case class Options(
+ * appName: String = "Job Name", ...
+ * )
+ * }
+ * }}}
+ */
+abstract class MahoutDriver {
+ implicit var mc: DistributedContext = _
+ implicit val sparkConf = new SparkConf()
+
+ /** Creates a Spark context to run the job inside.
+ * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job,
+ * these must be set before the context is created.
+ * @param masterUrl Spark master URL
+ * @param appName Name to display in Spark UI
+ * @param customJars List of paths to custom jars
+ * */
+ protected def start(masterUrl: String, appName: String, customJars:Traversable[String]) : Unit = {
+ mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
+ }
+
+ protected def start(masterUrl: String, appName: String) : Unit = {
+ val customJars = Traversable.empty[String]
+ mc = mahoutSparkContext(masterUrl, appName, customJars, sparkConf)
+ }
+
+ /** Override (optionally) for special cleanup */
+ protected def stop: Unit = {
+ mc.close
+ }
+
+ /** This is wher you do the work, call start first, then before exiting call stop */
+ protected def process: Unit
+
+ /** Parse command line and call process */
+ def main(args: Array[String]): Unit
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..8a337f5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */
+class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) {
+ override def showUsageOnError = true
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
new file mode 100644
index 0000000..c5b7385
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read.
+ * @tparam T type of object read, usually supplied by an extending trait.
+ * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created.
+ */
+trait Reader[T]{
+ val mc: DistributedContext
+ val readSchema: Schema
+ protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T
+ def readFrom(source: String): T = reader(mc, readSchema, source)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
+ * @tparam T
+ */
+trait Writer[T]{
+ val mc: DistributedContext
+ val writeSchema: Schema
+ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit
+ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection)
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
new file mode 100644
index 0000000..46e1540
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import scala.collection.mutable
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for HashMap[String, Any]
+ *
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+ // note: this require a mutable HashMap, do we care?
+ this ++= params
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
new file mode 100644
index 0000000..119f8d3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math.RandomAccessSparseVector
+import com.google.common.collect.{BiMap, HashBiMap}
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.sparkbindings._
+
+
+/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
+ */
+trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+ /** Read in text delimited tuples from all URIs in this comma delimited source String.
+ *
+ * @param mc context for the Spark job
+ * @param readSchema describes the delimiters and positions of values in the text delimited file.
+ * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+ * @return
+ */
+ protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = {
+ try {
+ val delimiter = readSchema("delim").asInstanceOf[String]
+ val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
+ val columnIDPosition = readSchema("columnIDPosition").asInstanceOf[Int]
+ val filterPosition = readSchema("filterPosition").asInstanceOf[Int]
+ val filterBy = readSchema("filter").asInstanceOf[String]
+ // instance vars must be put into locally scoped vals when used in closures that are executed but Spark
+
+ assert(!source.isEmpty, {
+ println(this.getClass.toString + ": has no files to read")
+ throw new IllegalArgumentException
+ })
+
+ var columns = mc.textFile(source).map { line => line.split(delimiter) }
+
+ // -1 means no filter in the input text, take them all
+ if(filterPosition != -1) {
+ // get the rows that have a column matching the filter
+ columns = columns.filter { tokens => tokens(filterPosition) == filterBy }
+ }
+
+ // get row and column IDs
+ val m = columns.collect
+ val interactions = columns.map { tokens =>
+ tokens(rowIDPosition) -> tokens(columnIDPosition)
+ }
+
+ interactions.cache()
+
+ // create separate collections of rowID and columnID tokens
+ val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
+ val columnIDs = interactions.map { case (_, columnID) => columnID }.distinct().collect()
+
+ val numRows = rowIDs.size
+ val numColumns = columnIDs.size
+
+ // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
+ // broadcast them for access in distributed processes, so they are not recalculated in every task.
+ val rowIDDictionary = asOrderedDictionary(rowIDs)
+ val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
+ val columnIDDictionary = asOrderedDictionary(columnIDs)
+ val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+
+ val indexedInteractions =
+ interactions.map { case (rowID, columnID) =>
+ val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+ val columnIndex = columnIDDictionary_bcast.value.get(columnID).get
+
+ rowIndex -> columnIndex
+ }
+ // group by IDs to form row vectors
+ .groupByKey().map { case (rowIndex, columnIndexes) =>
+ val row = new RandomAccessSparseVector(numColumns)
+ for (columnIndex <- columnIndexes) {
+ row.setQuick(columnIndex, 1.0)
+ }
+ rowIndex -> row
+ }
+ .asInstanceOf[DrmRdd[Int]]
+
+ // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
+ val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+
+ IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+
+ } catch {
+ case cce: ClassCastException => {
+ println(this.getClass.toString + ": Schema has illegal values"); throw cce
+ }
+ }
+ }
+
+ // this creates a BiMap from an ID collection. The ID points to an ordinal int
+ // which is used internal to Mahout as the row or column ID
+ // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem
+ private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = {
+ var dictionary: BiMap[String, Int] = HashBiMap.create()
+ var index = 0
+ for (entry <- entries) {
+ dictionary.forcePut(entry, index)
+ index += 1
+ }
+ dictionary
+ }
+}
+
+trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+ /** Read in text delimited tuples from all URIs in this comma delimited source String.
+ *
+ * @param mc context for the Spark job
+ * @param writeSchema describes the delimiters and positions of values in the output text delimited file.
+ * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
+ */
+ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = {
+ try {
+ val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
+ val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String]
+ val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String]
+ //instance vars must be put into locally scoped vals when put into closures that are
+ //executed but Spark
+ assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException })
+ assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException})
+ val matrix = indexedDataset.matrix
+ val rowIDDictionary = indexedDataset.rowIDs
+ val columnIDDictionary = indexedDataset.columnIDs
+
+ matrix.rdd.map { case (rowID, itemVector) =>
+
+ // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens
+ // first get the external rowID token
+ var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
+
+ // for the rest of the row, construct the vector contents of tuples (external column ID, strength value)
+ for (item <- itemVector.nonZeroes()) {
+ line += columnIDDictionary.inverse.get(item.index) + columnIdStrengthDelim + item.get + tupleDelim
+ }
+ // drop the last delimiter, not needed to end the line
+ line.dropRight(1)
+ }
+ .saveAsTextFile(dest)
+
+ }catch{
+ case cce: ClassCastException => {println(this.getClass.toString+": Schema has illegal values"); throw cce}
+ }
+ }
+}
+
+/** A combined trait that reads and writes */
+trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter
+
+/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file to be read.
+ * @param mc Spark context for reading files
+ * @note The source is supplied by Reader#readFrom .
+ * */
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
+
+/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading files
+ * @note the destination is supplied by Writer#writeTo trait method
+ * */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
+
+/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor.
+ * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read.
+ * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
+ * @param mc Spark context for reading the files, may be implicitly defined.
+ * */
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
+
+/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
+ * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
+ * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
+ * are probably short lived in terms of lines of code so complexity may be moot.
+ * @param matrix the data
+ * @param rowIDs bi-directional dictionary for rows of external IDs to internal ordinal Mahout IDs.
+ * @param columnIDs bi-directional dictionary for columns of external IDs to internal ordinal Mahout IDs.
+ * @param writeSchema contains params for the schema/format or the written text delimited file.
+ * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
+ * */
+class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int],
+ val writeSchema: Schema)(implicit val mc: DistributedContext)
+ extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
+
+ def writeTo(dest: String): Unit = {
+ writeTo(this, dest)
+ }
+}
+
+/**
+ * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for
+ * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
+ * {{{
+ * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source))
+ * }}}
+ */
+
+object IndexedDatasetTextDelimitedWriteable {
+ /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
+ def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc)
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index b0042c9..79c7585 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -18,6 +18,8 @@
package org.apache.mahout.sparkbindings.io
import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.serializers.JavaSerializer
+import com.google.common.collect.HashBiMap
import org.apache.mahout.math._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.mahout.sparkbindings._
@@ -33,7 +35,7 @@ class MahoutKryoRegistrator extends KryoRegistrator {
kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable])
kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable])
kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable])
-
+ kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer());
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
index 2db5f50..e46dad5 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
@@ -17,13 +17,11 @@
package org.apache.mahout.cf
-import org.scalatest.FunSuite
-import org.apache.mahout.test.MahoutSuite
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.MatrixOps
import org.apache.mahout.math.drm._
-import org.apache.mahout.math._
+import org.apache.mahout.math.scalabindings.{MatrixOps, _}
import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.test.MahoutSuite
+import org.scalatest.FunSuite
/* values
A =
@@ -41,32 +39,42 @@ B =
class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+ // correct cooccurrence with LLR
+ final val matrixLLRCoocAtAControl = dense(
+ (0.0, 0.6331745808516107, 0.0, 0.0, 0.0),
+ (0.6331745808516107, 0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.6331745808516107, 0.0),
+ (0.0, 0.0, 0.6331745808516107, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 0.0))
+
+ // correct cross-cooccurrence with LLR
+ final val matrixLLRCoocBtAControl = dense(
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.40461878191490940),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 0.8181382096075936))
+
+
+
test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
- val a = dense((1, 1, 0, 0, 0), (0, 0, 1, 1, 0), (0, 0, 0, 0, 1), (1, 0, 0, 1, 0))
- val b = dense((1, 1, 1, 1, 0), (1, 1, 1, 1, 0), (0, 0, 1, 0, 1), (1, 1, 0, 1, 0))
+ val a = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val b = dense(
+ (1, 1, 1, 1, 0),
+ (1, 1, 1, 1, 0),
+ (0, 0, 1, 0, 1),
+ (1, 1, 0, 1, 0))
+
val drmA = drmParallelize(m = a, numPartitions = 2)
val drmB = drmParallelize(m = b, numPartitions = 2)
- // correct cooccurrence with LLR
- val matrixLLRCoocAtAControl = dense(
- (0.0, 1.7260924347106847, 0, 0, 0),
- (1.7260924347106847, 0, 0, 0, 0),
- (0, 0, 0, 1.7260924347106847, 0),
- (0, 0, 1.7260924347106847, 0, 0),
- (0, 0, 0, 0, 0)
- )
-
- // correct cross-cooccurrence with LLR
- val matrixLLRCoocBtAControl = dense(
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (0, 0, 0, 0, 4.498681156950466)
- )
-
//self similarity
- val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
val matrixSelfCooc = drmCooc(0).checkpoint().collect
val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
var n = (new MatrixOps(m = diffMatrix)).norm
@@ -77,32 +85,25 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
n = (new MatrixOps(m = diff2Matrix)).norm
n should be < 1E-10
+
}
test("cooccurrence [A'A], [B'A] double data using LLR") {
- val a = dense((100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), (0.0D, 0.0D, 10.0D, 1.0D, 0.0D), (0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), (1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
- val b = dense((10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), (10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+ val a = dense(
+ (100000.0D, 1.0D, 0.0D, 0.0D, 0.0D),
+ (0.0D, 0.0D, 10.0D, 1.0D, 0.0D),
+ (0.0D, 0.0D, 0.0D, 0.0D, 1000.0D),
+ (1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
+
+ val b = dense(
+ (10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D),
+ (10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D),
+ (0.0D, 0.0D, 1000.0D, 0.0D, 100.0D),
+ (100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+
val drmA = drmParallelize(m = a, numPartitions = 2)
val drmB = drmParallelize(m = b, numPartitions = 2)
- // correct cooccurrence with LLR
- val matrixLLRCoocAtAControl = dense(
- (0.0, 1.7260924347106847, 0, 0, 0),
- (1.7260924347106847, 0, 0, 0, 0),
- (0, 0, 0, 1.7260924347106847, 0),
- (0, 0, 1.7260924347106847, 0, 0),
- (0, 0, 0, 0, 0)
- )
-
- // correct cross-cooccurrence with LLR
- val matrixLLRCoocBtAControl = dense(
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (0, 0, 0, 0, 4.498681156950466)
- )
-
//self similarity
val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
val matrixSelfCooc = drmCooc(0).checkpoint().collect
@@ -118,30 +119,22 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
}
test("cooccurrence [A'A], [B'A] integer data using LLR") {
- val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
- val b = dense((100, 1000, -10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, -100), (10, 100, 0, 1000, 0))
+ val a = dense(
+ (1000, 10, 0, 0, 0),
+ (0, 0, -10000, 10, 0),
+ (0, 0, 0, 0, 100),
+ (10000, 0, 0, 1000, 0))
+
+ val b = dense(
+ (100, 1000, -10000, 10000, 0),
+ (10000, 1000, 100, 10, 0),
+ (0, 0, 10, 0, -100),
+ (10, 100, 0, 1000, 0))
+
val drmA = drmParallelize(m = a, numPartitions = 2)
val drmB = drmParallelize(m = b, numPartitions = 2)
- // correct cooccurrence with LLR
- val matrixLLRCoocAtAControl = dense(
- (0.0, 1.7260924347106847, 0, 0, 0),
- (1.7260924347106847, 0, 0, 0, 0),
- (0, 0, 0, 1.7260924347106847, 0),
- (0, 0, 1.7260924347106847, 0, 0),
- (0, 0, 0, 0, 0)
- )
-
- // correct cross-cooccurrence with LLR
- val matrixLLRCoocBtAControl = dense(
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0),
- (0, 0, 0, 0, 4.498681156950466)
- )
-
- //self similarity
+ //self similarity
val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
//var cp = drmSelfCooc(0).checkpoint()
//cp.writeDRM("/tmp/cooc-spark/")//to get values written
@@ -158,22 +151,69 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
}
test("LLR calc") {
- val numInteractionsWithAandB = 10L
- val numInteractionsWithA = 100L
- val numInteractionsWithB = 200L
- val numInteractions = 10000l
-
- val llr = CooccurrenceAnalysis.loglikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
-
- assert(llr == 17.19462327013025)
+ val A = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val AtA = A.transpose().times(A)
+
+ /* AtA is:
+ 0 => {0:2.0,1:1.0,3:1.0}
+ 1 => {0:1.0,1:1.0}
+ 2 => {2:1.0,3:1.0}
+ 3 => {0:1.0,2:1.0,3:2.0}
+ 4 => {4:1.0}
+
+ val AtAd = dense(
+ (2, 1, 0, 1, 0),
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (1, 0, 1, 2, 0),
+ (0, 0, 0, 0, 1))
+
+ val AtAdNoSelfCooc = dense(
+ (0, 1, 0, 1, 0),
+ (1, 0, 0, 0, 0),
+ (0, 0, 0, 1, 0),
+ (1, 0, 1, 0, 0),
+ (0, 0, 0, 0, 0))
+
+ for (MatrixSlice row : cooccurrence) {
+ for (Vector.Element element : row.vector().nonZeroes()) {
+ long k11 = (long) element.get();// = 1
+ long k12 = (long) (rowSums.get(row.index()) - k11);// = 0
+ long k21 = (long) (colSums.get(element.index()) - k11);// = 1
+ long k22 = (long) (total - k11 - k12 - k21);// = 2
+ double score = LogLikelihood.rootLogLikelihoodRatio(k11, k12, k21, k22);
+ element.set(score);
+ }
+ }
+
+ for some reason the hadoop version returns the following
+ return 1.0 - 1.0 / (1.0 + logLikelihood);
+ so not a pure llr or root llr
+
+ */
+
+ //item (1,0)
+ val numInteractionsWithAandB = 1L
+ val numInteractionsWithA = 1L
+ val numInteractionsWithB = 2L
+ val numInteractions = 6l
+
+ val llr = CooccurrenceAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+ assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
}
test("downsampling by number per row") {
- val a = dense((1, 1, 1, 1, 0),
- (1, 1, 1, 1, 1),
- (0, 0, 0, 0, 1),
- (1, 1, 0, 1, 0)
- )
+ val a = dense(
+ (1, 1, 1, 1, 0),
+ (1, 1, 1, 1, 1),
+ (0, 0, 0, 0, 1),
+ (1, 1, 0, 1, 0))
val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
val downSampledDrm = CooccurrenceAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
new file mode 100644
index 0000000..f649d7b
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.scalatest.FunSuite
+import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.test.MahoutLocalContext
+import org.apache.mahout.test.MahoutSuite
+
+class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLocalContext {
+
+/*
+ // correct self-cooccurrence with LLR
+ final val matrixLLRCoocAtAControl = dense(
+ (0.0, 0.6331745808516107, 0.0, 0.0, 0.0),
+ (0.6331745808516107, 0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.6331745808516107, 0.0),
+ (0.0, 0.0, 0.6331745808516107, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 0.0))
+
+ // correct cross-cooccurrence with LLR
+ final val matrixLLRCoocBtAControl = dense(
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.40461878191490940),
+ (0.6331745808516107, 0.4046187819149094, 0.4046187819149094, 0.6331745808516107, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 0.8181382096075936))
+*/
+
+ final val SelfSimilairtyTSV = Set(
+ "galaxy\tnexus:0.6331745808516107",
+ "ipad\tiphone:0.6331745808516107",
+ "nexus\tgalaxy:0.6331745808516107",
+ "iphone\tipad:0.6331745808516107",
+ "surface")
+ final val CrossSimilarityTSV = Set(
+ "galaxy\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107",
+ "surface\tsurface:0.8181382096075936",
+ "nexus\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,surface:0.4046187819149094,galaxy:0.6331745808516107",
+ "ipad\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107",
+ "iphone\tnexus:0.4046187819149094,iphone:0.6331745808516107,ipad:0.4046187819149094,galaxy:0.6331745808516107")
+
+ final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to
+
+ /*
+ //Clustered Spark and HDFS, not a good everyday build test
+ ItemSimilarityDriver.main(Array(
+ "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+ "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+ "--master", "spark://occam4:7077",
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", ",",
+ "--itemIDPosition", "2",
+ "--rowIDPosition", "0",
+ "--filterPosition", "1"
+ ))
+*/
+ // local multi-threaded Spark with HDFS using large dataset
+ // not a good build test.
+ /* ItemSimilarityDriver.main(Array(
+ "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
+ "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+ "--master", "local[4]",
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", ",",
+ "--itemIDPosition", "2",
+ "--rowIDPosition", "0",
+ "--filterPosition", "1"
+ ))
+ */
+
+ test ("ItemSimilarityDriver, non-full-spec CSV"){
+
+ val InFile = TmpDir + "in-file.csv/" //using part files, not singel file
+ val OutPath = TmpDir + "indicator-matrices/"
+
+ val lines = Array(
+ "u1,purchase,iphone",
+ "u1,purchase,ipad",
+ "u2,purchase,nexus",
+ "u2,purchase,galaxy",
+ "u3,purchase,surface",
+ "u4,purchase,iphone",
+ "u4,purchase,galaxy",
+ "u1,view,iphone",
+ "u1,view,ipad",
+ "u1,view,nexus",
+ "u1,view,galaxy",
+ "u2,view,iphone",
+ "u2,view,ipad",
+ "u2,view,nexus",
+ "u2,view,galaxy",
+ "u3,view,surface",
+ "u3,view,nexus",
+ "u4,view,iphone",
+ "u4,view,ipad",
+ "u4,view,galaxy")
+
+ // this will create multiple part-xxxxx files in the InFile dir but other tests will
+ // take account of one actual file
+ val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+ afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+ // local multi-threaded Spark with default HDFS
+ ItemSimilarityDriver.main(Array(
+ "--input", InFile,
+ "--output", OutPath,
+ "--master", masterUrl,
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", ",",
+ "--itemIDPosition", "2",
+ "--rowIDPosition", "0",
+ "--filterPosition", "1"))
+
+ beforeEach // restart the test context to read the output of the driver
+ val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+ assert(indicatorLines == SelfSimilairtyTSV)
+ val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+ assert (crossIndicatorLines == CrossSimilarityTSV)
+ }
+
+
+
+ test ("ItemSimilarityDriver TSV "){
+
+ val InFile = TmpDir + "in-file.tsv/"
+ val OutPath = TmpDir + "indicator-matrices/"
+
+ val lines = Array(
+ "u1\tpurchase\tiphone",
+ "u1\tpurchase\tipad",
+ "u2\tpurchase\tnexus",
+ "u2\tpurchase\tgalaxy",
+ "u3\tpurchase\tsurface",
+ "u4\tpurchase\tiphone",
+ "u4\tpurchase\tgalaxy",
+ "u1\tview\tiphone",
+ "u1\tview\tipad",
+ "u1\tview\tnexus",
+ "u1\tview\tgalaxy",
+ "u2\tview\tiphone",
+ "u2\tview\tipad",
+ "u2\tview\tnexus",
+ "u2\tview\tgalaxy",
+ "u3\tview\tsurface",
+ "u3\tview\tnexus",
+ "u4\tview\tiphone",
+ "u4\tview\tipad",
+ "u4\tview\tgalaxy")
+
+ // this will create multiple part-xxxxx files in the InFile dir but other tests will
+ // take account of one actual file
+ val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+ afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+ // local multi-threaded Spark with default HDFS
+ ItemSimilarityDriver.main(Array(
+ "--input", InFile,
+ "--output", OutPath,
+ "--master", masterUrl,
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", "[,\t]",
+ "--itemIDPosition", "2",
+ "--rowIDPosition", "0",
+ "--filterPosition", "1"))
+
+ beforeEach // restart the test context to read the output of the driver
+ val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+ assert(indicatorLines == SelfSimilairtyTSV)
+ val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+ assert (crossIndicatorLines == CrossSimilarityTSV)
+
+ }
+
+ test ("ItemSimilarityDriver log-ish files"){
+
+ val InFile = TmpDir + "in-file.log/"
+ val OutPath = TmpDir + "indicator-matrices/"
+
+ val lines = Array(
+ "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
+ "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tipad",
+ "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tnexus",
+ "2014-06-23 14:46:53.115\tu2\tpurchase\trandom text\tgalaxy",
+ "2014-06-23 14:46:53.115\tu3\tpurchase\trandom text\tsurface",
+ "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tiphone",
+ "2014-06-23 14:46:53.115\tu4\tpurchase\trandom text\tgalaxy",
+ "2014-06-23 14:46:53.115\tu1\tview\trandom text\tiphone",
+ "2014-06-23 14:46:53.115\tu1\tview\trandom text\tipad",
+ "2014-06-23 14:46:53.115\tu1\tview\trandom text\tnexus",
+ "2014-06-23 14:46:53.115\tu1\tview\trandom text\tgalaxy",
+ "2014-06-23 14:46:53.115\tu2\tview\trandom text\tiphone",
+ "2014-06-23 14:46:53.115\tu2\tview\trandom text\tipad",
+ "2014-06-23 14:46:53.115\tu2\tview\trandom text\tnexus",
+ "2014-06-23 14:46:53.115\tu2\tview\trandom text\tgalaxy",
+ "2014-06-23 14:46:53.115\tu3\tview\trandom text\tsurface",
+ "2014-06-23 14:46:53.115\tu3\tview\trandom text\tnexus",
+ "2014-06-23 14:46:53.115\tu4\tview\trandom text\tiphone",
+ "2014-06-23 14:46:53.115\tu4\tview\trandom text\tipad",
+ "2014-06-23 14:46:53.115\tu4\tview\trandom text\tgalaxy")
+
+ // this will create multiple part-xxxxx files in the InFile dir but other tests will
+ // take account of one actual file
+ val linesRdd = mahoutCtx.parallelize(lines).saveAsTextFile(InFile)
+
+ afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+ // local multi-threaded Spark with default HDFS
+ ItemSimilarityDriver.main(Array(
+ "--input", InFile,
+ "--output", OutPath,
+ "--master", masterUrl,
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", "\t",
+ "--itemIDPosition", "4",
+ "--rowIDPosition", "1",
+ "--filterPosition", "2"))
+
+ beforeEach // restart the test context to read the output of the driver
+ val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+ assert(indicatorLines == SelfSimilairtyTSV)
+ val crossIndicatorLines = mahoutCtx.textFile(OutPath+"/cross-indicator-matrix/").collect.toSet[String]
+ assert (crossIndicatorLines == CrossSimilarityTSV)
+
+ }
+
+ test ("ItemSimilarityDriver legacy supported file format"){
+
+ val InDir = TmpDir + "in-dir/"
+ val InFilename = "in-file.tsv"
+ val InPath = InDir + InFilename
+
+ val OutPath = TmpDir + "indicator-matrices"
+
+ val lines = Array(
+ "0,0,1",
+ "0,1,1",
+ "1,2,1",
+ "1,3,1",
+ "2,4,1",
+ "3,0,1",
+ "3,3,1")
+
+ val Answer = Set(
+ "0\t1:0.6331745808516107",
+ "3\t2:0.6331745808516107",
+ "1\t0:0.6331745808516107",
+ "4",
+ "2\t3:0.6331745808516107")
+
+ // this creates one part-0000 file in the directory
+ mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir)
+
+ // to change from using part files to a single .tsv file we'll need to use HDFS
+ val fs = FileSystem.get(new Configuration())
+ //rename part-00000 to something.tsv
+ fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
+
+ afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+ // local multi-threaded Spark with default HDFS
+ ItemSimilarityDriver.main(Array(
+ "--input", InPath,
+ "--output", OutPath,
+ "--master", masterUrl))
+
+ beforeEach // restart the test context to read the output of the driver
+ val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+ assert(indicatorLines == Answer)
+
+ }
+
+ test("ItemSimilarityDriver recursive file discovery using filename patterns"){
+ //directory structure using the following
+ // tmp/data/m1.tsv
+ // tmp/data/more-data/another-dir/m2.tsv
+ val M1Lines = Array(
+ "u1\tpurchase\tiphone",
+ "u1\tpurchase\tipad",
+ "u2\tpurchase\tnexus",
+ "u2\tpurchase\tgalaxy",
+ "u3\tpurchase\tsurface",
+ "u4\tpurchase\tiphone",
+ "u4\tpurchase\tgalaxy",
+ "u1\tview\tiphone")
+
+ val M2Lines = Array(
+ "u1\tview\tipad",
+ "u1\tview\tnexus",
+ "u1\tview\tgalaxy",
+ "u2\tview\tiphone",
+ "u2\tview\tipad",
+ "u2\tview\tnexus",
+ "u2\tview\tgalaxy",
+ "u3\tview\tsurface",
+ "u3\tview\tnexus",
+ "u4\tview\tiphone",
+ "u4\tview\tipad",
+ "u4\tview\tgalaxy")
+
+ val InFilenameM1 = "m1.tsv"
+ val InDirM1 = TmpDir + "data/"
+ val InPathM1 = InDirM1 + InFilenameM1
+ val InFilenameM2 = "m2.tsv"
+ val InDirM2 = TmpDir + "data/more-data/another-dir/"
+ val InPathM2 = InDirM2 + InFilenameM2
+
+ val InPathStart = TmpDir + "data/"
+ val OutPath = TmpDir + "indicator-matrices"
+
+ // this creates one part-0000 file in the directory
+ mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM1)
+
+ // to change from using part files to a single .tsv file we'll need to use HDFS
+ val fs = FileSystem.get(new Configuration())
+ //rename part-00000 to something.tsv
+ fs.rename(new Path(InDirM1 + "part-00000"), new Path(InPathM1))
+
+ // this creates one part-0000 file in the directory
+ mahoutCtx.parallelize(M2Lines).coalesce(1, shuffle=true).saveAsTextFile(InDirM2)
+
+ // to change from using part files to a single .tsv file we'll need to use HDFS
+ //rename part-00000 to tmp/some-location/something.tsv
+ fs.rename(new Path(InDirM2 + "part-00000"), new Path(InPathM2))
+
+ // local multi-threaded Spark with default FS, suitable for build tests but need better location for data
+
+ afterEach // clean up before running the driver, it should handle the Spark conf and context
+
+ ItemSimilarityDriver.main(Array(
+ "--input", InPathStart,
+ "--output", OutPath,
+ "--master", masterUrl,
+ "--filter1", "purchase",
+ "--filter2", "view",
+ "--inDelim", "\t",
+ "--itemIDPosition", "2",
+ "--rowIDPosition", "0",
+ "--filterPosition", "1",
+ "--filenamePattern", "m..tsv",
+ "--recursive"))
+
+ beforeEach()// restart the test context to read the output of the driver
+ val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toSet[String]
+ assert(indicatorLines == SelfSimilairtyTSV)
+ val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toSet[String]
+ assert (crossIndicatorLines == CrossSimilarityTSV)
+ }
+
+ override def afterAll = {
+ // remove TmpDir
+ val fs = FileSystem.get(new Configuration())
+ fs.delete(new Path(TmpDir), true) // delete recursively
+
+ super.afterAll
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/2b65475c/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index c48cfc7..fb97f68 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -10,11 +10,13 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration {
this: Suite =>
protected implicit var mahoutCtx: DistributedContext = _
+ protected var masterUrl = null.asInstanceOf[String]
override protected def beforeEach() {
super.beforeEach()
- mahoutCtx = mahoutSparkContext(masterUrl = "local[2]",
+ masterUrl = "local[2]"
+ mahoutCtx = mahoutSparkContext(masterUrl = this.masterUrl,
appName = "MahoutLocalContext",
// Do not run MAHOUT_HOME jars in unit tests.
addMahoutJars = false,