You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/10/14 23:45:59 UTC
[1/2] NOJIRA refactor IndexedDataset and CLI drivers into core and
engine specific parts, closes apache/mahout#59
Repository: mahout
Updated Branches:
refs/heads/master 4e6577d14 -> 666d314fb
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 920c32b..f8abef3 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -17,15 +17,18 @@
package org.apache.mahout.drivers
+import org.apache.mahout.common.HDFSPathSearch
import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSRead}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]].
- * Reads a text delimited file containing a Mahout DRM of the form
- * (row id, column id: strength, ...). The IDs are user specified strings which will be
- * preserved in the
- * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]].
+ * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * with domain specific IDS of the form
+ * (row id, column id: strength, ...). The IDs will be preserved in the
+ * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]
* will be used to calculate row-wise similarity using log-likelihood
* The options allow control of the input schema, file discovery, output schema, and control of
* algorithm parameters.
@@ -36,14 +39,13 @@ import scala.collection.immutable.HashMap
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option.
*/
-object RowSimilarityDriver extends MahoutDriver {
+object RowSimilarityDriver extends MahoutSparkDriver {
// define only the options specific to RowSimilarity
private final val RowSimilarityOptions = HashMap[String, Any](
"maxObservations" -> 500,
"maxSimilaritiesPerRow" -> 100,
"appName" -> "RowSimilarityDriver")
- private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _
private var readWriteSchema: Schema = _
/**
@@ -110,10 +112,13 @@ object RowSimilarityDriver extends MahoutDriver {
// todo: the HashBiMap used in the TextDelimited Reader is hard coded into
// MahoutKryoRegistrator, it should be added to the register list here so it
- // will be only spcific to this job.
+ // will be only specific to this job.
sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")
- .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take this out?
+
+ if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+ sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ //else leave as set in Spark config
super.start(masterUrl, appName)
@@ -123,20 +128,18 @@ object RowSimilarityDriver extends MahoutDriver {
"omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
"elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
- readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema)
-
}
private def readIndexedDataset: IndexedDataset = {
- val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
null.asInstanceOf[IndexedDataset]
} else {
- val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles))
+ val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema)
datasetA
}
}
@@ -146,12 +149,12 @@ object RowSimilarityDriver extends MahoutDriver {
val indexedDataset = readIndexedDataset
- val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int],
- parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int])
+ val rowSimilarityIDS = SimilarityAnalysis.rowSimilarityIDS(indexedDataset,
+ parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int],
+ parser.opts("maxObservations").asInstanceOf[Int])
- val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm,
- indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema)
- rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String])
+ rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema)
stop
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
deleted file mode 100644
index 92163be..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import scala.collection.mutable
-import scala.collection.mutable.HashMap
-
-/** Syntactic sugar for mutable.HashMap[String, Any]
- *
- * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
- */
-class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
- // note: this require a mutable HashMap, do we care?
- this ++= params
-
- /** Constructor for copying an existing Schema
- *
- * @param schemaToClone return a copy of this Schema
- */
- def this(schemaToClone: Schema){
- this()
- this ++= schemaToClone
- }
-}
-
-// These can be used to keep the text in and out fairly standard to Mahout, where an application specific
-// format is not required.
-
-/** Simple default Schema for typical text delimited element file input
- * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
- * <comma, tab, or space>here may be other ignored text...)
- */
-class DefaultElementReadSchema extends Schema(
- "delim" -> "[,\t ]", //comma, tab or space
- "filter" -> "",
- "rowIDColumn" -> 0,
- "columnIDPosition" -> 1,
- "filterColumn" -> -1)
-
-/** Default Schema for text delimited drm file output
- * This tells the writer to write a DRM of the default form:
- * (rowID<tab>columnID1:score1<space>columnID2:score2...)
- */
-class DefaultDRMWriteSchema extends Schema(
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ",
- "omitScore" -> false)
-
-/** Default Schema for typical text delimited drm file input
- * This tells the reader to input text lines of the form:
- * (rowID<tab>columnID1:score1,columnID2:score2,...)
- */
-class DefaultDRMReadSchema extends Schema(
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ")
-
-/** Default Schema for reading a text delimited drm file where the score of any element is ignored,
- * all non-zeros are replaced with 1.
- * This tells the reader to input DRM lines of the form
- * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
- * Alternatively the format can be
- * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
- * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
- */
-class DRMReadBooleanSchema extends Schema(
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ",
- "omitScore" -> true)
-
-/** Default Schema for typical text delimited drm file write where the score of a element is omitted.
- * The presence of a element means the score = 1, the absence means a score of 0.
- * This tells the writer to output DRM lines of the form
- * (rowID<tab>columnID1<space>columnID2...)
- */
-class DRMWriteBooleanSchema extends Schema(
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ",
- "omitScore" -> true)
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 274ad98..ba8f7d1 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -17,6 +17,8 @@
package org.apache.mahout.drivers
+import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, IndexedDataset}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.SparkContext._
import org.apache.mahout.math.RandomAccessSparseVector
import com.google.common.collect.{BiMap, HashBiMap}
@@ -24,23 +26,23 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
import org.apache.mahout.sparkbindings._
import scala.collection.JavaConversions._
-/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]]
+/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]]
*/
-trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
/** Read in text delimited elements from all URIs in the comma delimited source String and return
* the DRM of all elements updating the dictionaries for row and column dictionaries. If there is
* no strength value in the element, assume it's presence means a strength of 1.
*
* @param mc context for the Spark job
* @param readSchema describes the delimiters and positions of values in the text delimited file.
- * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+ * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
* @return
*/
protected def elementReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = {
try {
val delimiter = readSchema("delim").asInstanceOf[String]
val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int]
@@ -105,7 +107,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
// wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
- IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+ new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
} catch {
case cce: ClassCastException => {
@@ -120,14 +122,14 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
*
* @param mc context for the Spark job
* @param readSchema describes the delimiters and positions of values in the text delimited file.
- * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]]
+ * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]]
* @return
*/
protected def drmReader(
mc: DistributedContext,
readSchema: Schema,
source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = {
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = {
try {
val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String]
@@ -193,7 +195,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
// wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
- IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+ new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
} catch {
case cce: ClassCastException => {
@@ -203,7 +205,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
}
}
- // this creates a BiMap from an ID collection. The ID points to an ordinal int
+ // this creates a BiMap from an ID collection. The ID points to an ordinal int
// which is used internal to Mahout as the row or column ID
// todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a
// non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit
@@ -218,7 +220,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
}
}
-trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
@@ -226,13 +228,13 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
*
* @param mc context for the Spark job
* @param writeSchema describes the delimiters and positions of values in the output text delimited file.
- * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]]
+ * @param dest directory to write text delimited version of [[IndexedDatasetSpark]]
*/
protected def writer(
mc: DistributedContext,
writeSchema: Schema,
dest: String,
- indexedDataset: IndexedDataset,
+ indexedDataset: IndexedDatasetSpark,
sort: Boolean = true): Unit = {
try {
val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
@@ -295,7 +297,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed
/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param readSchema describes the delimiters and position of values in the text delimited file to be read.
* @param mc Spark context for reading files
- * @note The source is supplied by Reader#readElementsFrom .
+ * @note The source is supplied to Reader#readElementsFrom .
* */
class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
(implicit val mc: DistributedContext) extends TDIndexedDatasetReader
@@ -303,7 +305,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor.
* @param writeSchema describes the delimiters and position of values in the text delimited file(s) written.
* @param mc Spark context for reading files
- * @note the destination is supplied by Writer#writeTo trait method
+ * @note the destination is supplied to Writer#writeTo
* */
class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter
@@ -316,41 +318,3 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS
(implicit val mc: DistributedContext)
extends TDIndexedDatasetReaderWriter
-/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating
- * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm().
- * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables
- * are probably short lived in terms of lines of code so complexity may be moot.
- * @param matrix the data
- * @param rowIDs bi-directional dictionary for rows of external IDs to internal ordinal Mahout IDs.
- * @param columnIDs bi-directional dictionary for columns of external IDs to internal ordinal Mahout IDs.
- * @param writeSchema contains params for the schema/format or the written text delimited file.
- * @param mc mahout distributed context (DistributedContext) may be implicitly defined.
- * */
-class IndexedDatasetTextDelimitedWriteable(
- matrix: CheckpointedDrm[Int],
- rowIDs: BiMap[String,Int],
- columnIDs: BiMap[String,Int],
- val writeSchema: Schema,
- val sort: Boolean = true)
- (implicit val mc: DistributedContext)
- extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter {
-
- override def writeTo(collection: IndexedDataset = this, dest: String): Unit = {
- super.writeTo(this, dest)
- }
-}
-
-/**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily
- * used to get a secondary constructor for
- * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a
- * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]]
- * {{{
- * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source))
- * }}}
- */
-
-object IndexedDatasetTextDelimitedWriteable {
- /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
- def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc)
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 08b2c34..c0d36c6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,10 +17,11 @@
package org.apache.mahout.sparkbindings
-import java.io.IOException
-
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
import org.apache.mahout.math._
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm.logical._
@@ -250,6 +251,37 @@ object SparkEngine extends DistributedEngine {
}
}
+ /**
+ * reads an IndexedDatasetSpark from default text delimited files
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetSpark = {
+ val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+ val ids = reader.readDRMFrom(src, existingRowIDs)
+ ids
+ }
+
+ /**
+ * reads an IndexedDatasetSpark from default text delimited files
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetElementReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetSpark = {
+ val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
+ val ids = reader.readElementsFrom(src, existingRowIDs)
+ ids
+ }
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index b753f6f..e5a2b2a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -197,4 +197,16 @@ class CheckpointedDrmSpark[K: ClassTag](
protected def computeNNonZero =
cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
+ /** Changes the number of rows in the DRM without actually touching the underlying data. Used to
+ * redimension a DRM after it has been created, which implies some blank, non-existent rows.
+ * @param n new row dimension
+ * @return
+ */
+ override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+ assert(n > -1)
+ assert( n >= nrow)
+ val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+ newCheckpointedDrm
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
new file mode 100644
index 0000000..d3aa0a8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, Reader, Schema, IndexedDataset}
+
+/**
+ * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific
+ * dfsWrite method
+ */
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+ extends IndexedDataset {
+
+ /** Secondary constructor enabling immutability */
+ def this(id2: IndexedDatasetSpark){
+ this(id2.matrix, id2.rowIDs, id2.columnIDs)
+ }
+
+ /** Factory method used to create this extending class when the interface of
+ * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */
+ override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+ IndexedDatasetSpark = {
+ new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
+ }
+
+ /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+ override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema)
+ (implicit sc: DistributedContext):
+ Unit = {
+ val writer = new TextDelimitedIndexedDatasetWriter(schema)(sc)
+ writer.writeTo(this, dest)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index 6ca2a14..c441716 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -17,6 +17,9 @@
package org.apache.mahout
+import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
+import org.apache.mahout.math.indexeddataset.Schema
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.{SparkConf, SparkContext}
import java.io._
import scala.collection.mutable.ArrayBuffer
@@ -224,5 +227,4 @@ package object sparkbindings {
mcjars
}
-
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
deleted file mode 100644
index 29c7b84..0000000
--- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf
-
-import org.apache.mahout.math.cf.SimilarityAnalysis
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings.{MatrixOps, _}
-import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
-import org.apache.mahout.test.MahoutSuite
-import org.scalatest.FunSuite
-
-/* values
-A =
-1 1 0 0 0
-0 0 1 1 0
-0 0 0 0 1
-1 0 0 1 0
-
-B =
-1 1 1 1 0
-1 1 1 1 0
-0 0 1 0 1
-1 1 0 1 0
- */
-
-class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
-
- // correct cooccurrence with LLR
- final val matrixLLRCoocAtAControl = dense(
- (0.0, 1.7260924347106847, 0.0, 0.0, 0.0),
- (1.7260924347106847, 0.0, 0.0, 0.0, 0.0),
- (0.0, 0.0, 0.0, 1.7260924347106847, 0.0),
- (0.0, 0.0, 1.7260924347106847, 0.0, 0.0),
- (0.0, 0.0, 0.0, 0.0, 0.0))
-
- // correct cross-cooccurrence with LLR
- final val m = dense(
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
- (0.0, 0.0, 0.0, 0.0, 4.498681156950466))
-
- final val matrixLLRCoocBtAControl = dense(
- (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
- (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
- (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
- (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
- (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466))
-
-
- test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
- val a = dense(
- (1, 1, 0, 0, 0),
- (0, 0, 1, 1, 0),
- (0, 0, 0, 0, 1),
- (1, 0, 0, 1, 0))
-
- val b = dense(
- (1, 1, 1, 1, 0),
- (1, 1, 1, 1, 0),
- (0, 0, 1, 0, 1),
- (1, 1, 0, 1, 0))
-
- val drmA = drmParallelize(m = a, numPartitions = 2)
- val drmB = drmParallelize(m = b, numPartitions = 2)
-
- //self similarity
- val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
- val matrixSelfCooc = drmCooc(0).checkpoint().collect
- val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
- var n = (new MatrixOps(m = diffMatrix)).norm
- n should be < 1E-10
-
- //cross similarity
- val matrixCrossCooc = drmCooc(1).checkpoint().collect
- val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
- n = (new MatrixOps(m = diff2Matrix)).norm
- n should be < 1E-10
-
- }
-
- test("cooccurrence [A'A], [B'A] double data using LLR") {
- val a = dense(
- (100000.0D, 1.0D, 0.0D, 0.0D, 0.0D),
- ( 0.0D, 0.0D, 10.0D, 1.0D, 0.0D),
- ( 0.0D, 0.0D, 0.0D, 0.0D, 1000.0D),
- ( 1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
-
- val b = dense(
- (10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D),
- ( 10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D),
- ( 0.0D, 0.0D, 1000.0D, 0.0D, 100.0D),
- ( 100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
-
- val drmA = drmParallelize(m = a, numPartitions = 2)
- val drmB = drmParallelize(m = b, numPartitions = 2)
-
- //self similarity
- val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
- val matrixSelfCooc = drmCooc(0).checkpoint().collect
- val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
- var n = (new MatrixOps(m = diffMatrix)).norm
- n should be < 1E-10
-
- //cross similarity
- val matrixCrossCooc = drmCooc(1).checkpoint().collect
- val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
- n = (new MatrixOps(m = diff2Matrix)).norm
- n should be < 1E-10
- }
-
- test("cooccurrence [A'A], [B'A] integer data using LLR") {
- val a = dense(
- ( 1000, 10, 0, 0, 0),
- ( 0, 0, -10000, 10, 0),
- ( 0, 0, 0, 0, 100),
- (10000, 0, 0, 1000, 0))
-
- val b = dense(
- ( 100, 1000, -10000, 10000, 0),
- (10000, 1000, 100, 10, 0),
- ( 0, 0, 10, 0, -100),
- ( 10, 100, 0, 1000, 0))
-
- val drmA = drmParallelize(m = a, numPartitions = 2)
- val drmB = drmParallelize(m = b, numPartitions = 2)
-
- //self similarity
- val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
- //var cp = drmSelfCooc(0).checkpoint()
- //cp.writeDRM("/tmp/cooc-spark/")//to get values written
- val matrixSelfCooc = drmCooc(0).checkpoint().collect
- val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
- var n = (new MatrixOps(m = diffMatrix)).norm
- n should be < 1E-10
-
- //cross similarity
- val matrixCrossCooc = drmCooc(1).checkpoint().collect
- val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
- n = (new MatrixOps(m = diff2Matrix)).norm
- n should be < 1E-10
- }
-
- test("cooccurrence two matrices with different number of columns"){
- val a = dense(
- (1, 1, 0, 0, 0),
- (0, 0, 1, 1, 0),
- (0, 0, 0, 0, 1),
- (1, 0, 0, 1, 0))
-
- val b = dense(
- (0, 1, 1, 0),
- (1, 1, 1, 0),
- (0, 0, 1, 0),
- (1, 1, 0, 1))
-
- val matrixLLRCoocBtANonSymmetric = dense(
- (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
- (0.0, 0.6795961471815897, 0.6795961471815897, 0.0),
- (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
- (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
- (0.0, 0.0, 0.6795961471815897, 0.0))
-
- val drmA = drmParallelize(m = a, numPartitions = 2)
- val drmB = drmParallelize(m = b, numPartitions = 2)
-
- //self similarity
- val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
- val matrixSelfCooc = drmCooc(0).checkpoint().collect
- val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
- var n = (new MatrixOps(m = diffMatrix)).norm
- n should be < 1E-10
-
- //cross similarity
- val matrixCrossCooc = drmCooc(1).checkpoint().collect
- val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
- n = (new MatrixOps(m = diff2Matrix)).norm
-
- //cooccurrence without LLR is just a A'B
- //val inCoreAtB = a.transpose().times(b)
- //val bp = 0
- }
-
- test("LLR calc") {
- val A = dense(
- (1, 1, 0, 0, 0),
- (0, 0, 1, 1, 0),
- (0, 0, 0, 0, 1),
- (1, 0, 0, 1, 0))
-
- val AtA = A.transpose().times(A)
-
- /* AtA is:
- 0 => {0:2.0,1:1.0,3:1.0}
- 1 => {0:1.0,1:1.0}
- 2 => {2:1.0,3:1.0}
- 3 => {0:1.0,2:1.0,3:2.0}
- 4 => {4:1.0}
-
- val AtAd = dense(
- (2, 1, 0, 1, 0),
- (1, 1, 0, 0, 0),
- (0, 0, 1, 1, 0),
- (1, 0, 1, 2, 0),
- (0, 0, 0, 0, 1))
-
- val AtAdNoSelfCooc = dense(
- (0, 1, 0, 1, 0),
- (1, 0, 0, 0, 0),
- (0, 0, 0, 1, 0),
- (1, 0, 1, 0, 0),
- (0, 0, 0, 0, 0))
-
- */
-
- //item (1,0)
- val numInteractionsWithAandB = 1L
- val numInteractionsWithA = 1L
- val numInteractionsWithB = 2L
- val numInteractions = 6l
-
- val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
-
- assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
- }
-
- test("downsampling by number per row") {
- val a = dense(
- (1, 1, 1, 1, 0),
- (1, 1, 1, 1, 1),
- (0, 0, 0, 0, 1),
- (1, 1, 0, 1, 0))
- val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
-
- val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
- //count non-zero values, should be == 7
- var numValues = 0
- val m = downSampledDrm.collect
- val it = m.iterator()
- while (it.hasNext) {
- val v = it.next().vector()
- val nonZeroIt = v.nonZeroes().iterator()
- while (nonZeroIt.hasNext) {
- numValues += 1
- nonZeroIt.next()
- }
- }
-
- assert(numValues == 8) //Don't change the random seed or this may fail.
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
new file mode 100644
index 0000000..0b3b3eb
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.scalabindings.{MatrixOps, _}
+import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
+import org.apache.mahout.test.MahoutSuite
+import org.scalatest.FunSuite
+
+/* values
+A =
+1 1 0 0 0
+0 0 1 1 0
+0 0 0 0 1
+1 0 0 1 0
+
+B =
+1 1 1 1 0
+1 1 1 1 0
+0 0 1 0 1
+1 1 0 1 0
+ */
+
+// todo: add tests for the IndexedDataset coccurrence methods
+
+class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite {
+
+ // correct cooccurrence with LLR
+ final val matrixLLRCoocAtAControl = dense(
+ (0.0, 1.7260924347106847, 0.0, 0.0, 0.0),
+ (1.7260924347106847, 0.0, 0.0, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 1.7260924347106847, 0.0),
+ (0.0, 0.0, 1.7260924347106847, 0.0, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 0.0))
+
+ // correct cross-cooccurrence with LLR
+ final val m = dense(
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0),
+ (0.0, 0.0, 0.0, 0.0, 4.498681156950466))
+
+ final val matrixLLRCoocBtAControl = dense(
+ (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+ (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0),
+ (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466))
+
+
+ test("cooccurrence [A'A], [B'A] boolbean data using LLR") {
+ val a = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val b = dense(
+ (1, 1, 1, 1, 0),
+ (1, 1, 1, 1, 0),
+ (0, 0, 1, 0, 1),
+ (1, 1, 0, 1, 0))
+
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ //self similarity
+ val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+
+ }
+
+ test("cooccurrence [A'A], [B'A] double data using LLR") {
+ val a = dense(
+ (100000.0D, 1.0D, 0.0D, 0.0D, 0.0D),
+ ( 0.0D, 0.0D, 10.0D, 1.0D, 0.0D),
+ ( 0.0D, 0.0D, 0.0D, 0.0D, 1000.0D),
+ ( 1.0D, 0.0D, 0.0D, 10.0D, 0.0D))
+
+ val b = dense(
+ (10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D),
+ ( 10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D),
+ ( 0.0D, 0.0D, 1000.0D, 0.0D, 100.0D),
+ ( 100.0D, 1.0D, 0.0D, 100000.0D, 0.0D))
+
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ //self similarity
+ val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+ }
+
+ test("cooccurrence [A'A], [B'A] integer data using LLR") {
+ val a = dense(
+ ( 1000, 10, 0, 0, 0),
+ ( 0, 0, -10000, 10, 0),
+ ( 0, 0, 0, 0, 100),
+ (10000, 0, 0, 1000, 0))
+
+ val b = dense(
+ ( 100, 1000, -10000, 10000, 0),
+ (10000, 1000, 100, 10, 0),
+ ( 0, 0, 10, 0, -100),
+ ( 10, 100, 0, 1000, 0))
+
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ //self similarity
+ val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ //var cp = drmSelfCooc(0).checkpoint()
+ //cp.writeDRM("/tmp/cooc-spark/")//to get values written
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+ n should be < 1E-10
+ }
+
+ test("cooccurrence two matrices with different number of columns"){
+ val a = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val b = dense(
+ (0, 1, 1, 0),
+ (1, 1, 1, 0),
+ (0, 0, 1, 0),
+ (1, 1, 0, 1))
+
+ val matrixLLRCoocBtANonSymmetric = dense(
+ (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+ (0.0, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0),
+ (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847),
+ (0.0, 0.0, 0.6795961471815897, 0.0))
+
+ val drmA = drmParallelize(m = a, numPartitions = 2)
+ val drmB = drmParallelize(m = b, numPartitions = 2)
+
+ //self similarity
+ val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
+ val matrixSelfCooc = drmCooc(0).checkpoint().collect
+ val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl)
+ var n = (new MatrixOps(m = diffMatrix)).norm
+ n should be < 1E-10
+
+ //cross similarity
+ val matrixCrossCooc = drmCooc(1).checkpoint().collect
+ val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric)
+ n = (new MatrixOps(m = diff2Matrix)).norm
+
+ //cooccurrence without LLR is just a A'B
+ //val inCoreAtB = a.transpose().times(b)
+ //val bp = 0
+ }
+
+ test("LLR calc") {
+ val A = dense(
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (0, 0, 0, 0, 1),
+ (1, 0, 0, 1, 0))
+
+ val AtA = A.transpose().times(A)
+
+ /* AtA is:
+ 0 => {0:2.0,1:1.0,3:1.0}
+ 1 => {0:1.0,1:1.0}
+ 2 => {2:1.0,3:1.0}
+ 3 => {0:1.0,2:1.0,3:2.0}
+ 4 => {4:1.0}
+
+ val AtAd = dense(
+ (2, 1, 0, 1, 0),
+ (1, 1, 0, 0, 0),
+ (0, 0, 1, 1, 0),
+ (1, 0, 1, 2, 0),
+ (0, 0, 0, 0, 1))
+
+ val AtAdNoSelfCooc = dense(
+ (0, 1, 0, 1, 0),
+ (1, 0, 0, 0, 0),
+ (0, 0, 0, 1, 0),
+ (1, 0, 1, 0, 0),
+ (0, 0, 0, 0, 0))
+
+ */
+
+ //item (1,0)
+ val numInteractionsWithAandB = 1L
+ val numInteractionsWithA = 1L
+ val numInteractionsWithB = 2L
+ val numInteractions = 6l
+
+ val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions)
+
+ assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty
+ }
+
+ test("downsampling by number per row") {
+ val a = dense(
+ (1, 1, 1, 1, 0),
+ (1, 1, 1, 1, 1),
+ (0, 0, 0, 0, 1),
+ (1, 1, 0, 1, 0))
+ val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2)
+
+ val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4)
+ //count non-zero values, should be == 7
+ var numValues = 0
+ val m = downSampledDrm.collect
+ val it = m.iterator()
+ while (it.hasNext) {
+ val v = it.next().vector()
+ val nonZeroIt = v.nonZeroes().iterator()
+ while (nonZeroIt.hasNext) {
+ numValues += 1
+ nonZeroIt.next()
+ }
+ }
+
+ assert(numValues == 8) //Don't change the random seed or this may fail.
+ }
+}
[2/2] git commit: NOJIRA refactor IndexedDataset and CLI drivers into
core and engine specific parts, closes apache/mahout#59
Posted by pa...@apache.org.
NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/666d314f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/666d314f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/666d314f
Branch: refs/heads/master
Commit: 666d314fb8d9f466a5496a143d73d55036aea619
Parents: 4e6577d
Author: pferrel <pa...@occamsmachete.com>
Authored: Tue Oct 14 14:45:32 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Tue Oct 14 14:45:32 2014 -0700
----------------------------------------------------------------------
.../apache/mahout/h2obindings/H2OEngine.scala | 44 +++
.../h2obindings/drm/CheckpointedDrmH2O.scala | 13 +
math-scala/pom.xml | 6 +
.../apache/mahout/drivers/MahoutDriver.scala | 43 +++
.../mahout/drivers/MahoutOptionParser.scala | 220 +++++++++++++++
.../mahout/math/cf/SimilarityAnalysis.scala | 56 +++-
.../mahout/math/drm/CheckpointedDrm.scala | 3 +
.../mahout/math/drm/DistributedEngine.scala | 25 ++
.../org/apache/mahout/math/drm/package.scala | 26 +-
.../math/indexeddataset/IndexedDataset.scala | 64 +++++
.../math/indexeddataset/ReaderWriter.scala | 68 +++++
.../mahout/math/indexeddataset/Schema.scala | 102 +++++++
.../apache/mahout/common/HDFSPathSearch.scala | 78 ++++++
.../apache/mahout/drivers/FileSysUtils.scala | 76 ------
.../apache/mahout/drivers/IndexedDataset.scala | 80 ------
.../mahout/drivers/ItemSimilarityDriver.scala | 90 +++----
.../apache/mahout/drivers/MahoutDriver.scala | 104 -------
.../mahout/drivers/MahoutOptionParser.scala | 214 ---------------
.../mahout/drivers/MahoutSparkDriver.scala | 87 ++++++
.../apache/mahout/drivers/ReaderWriter.scala | 66 -----
.../mahout/drivers/RowSimilarityDriver.scala | 41 +--
.../org/apache/mahout/drivers/Schema.scala | 98 -------
.../drivers/TextDelimitedReaderWriter.scala | 68 ++---
.../mahout/sparkbindings/SparkEngine.scala | 38 ++-
.../drm/CheckpointedDrmSpark.scala | 12 +
.../indexeddataset/IndexedDatasetSpark.scala | 53 ++++
.../apache/mahout/sparkbindings/package.scala | 4 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 267 ------------------
.../mahout/cf/SimilarityAnalysisSuite.scala | 269 +++++++++++++++++++
29 files changed, 1273 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 28214c6..99bc3ba 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -17,6 +17,9 @@
package org.apache.mahout.h2obindings
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema}
+
import scala.reflect._
import org.apache.mahout.math._
import org.apache.mahout.math.drm._
@@ -112,4 +115,45 @@ object H2OEngine extends DistributedEngine {
}
implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
+
+ /** stub class not implemented in H2O */
+ abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+ extends IndexedDataset {}
+
+ /**
+ * reads an IndexedDatasetH2O from default text delimited files
+ * @todo unimplemented
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetH2O = {
+ // should log a warning when this is built but no logger here, can an H2O contributor help with this
+ println("Warning: unimplemented indexedDatasetDFSReadElements." )
+ throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.")
+ null.asInstanceOf[IndexedDatasetH2O]
+ }
+
+ /**
+ * reads an IndexedDatasetH2O from default text delimited files
+ * @todo unimplemented
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetH2O = {
+ // should log a warning when this is built but no logger here, can an H2O contributor help with this
+ println("Warning: unimplemented indexedDatasetDFSReadElements." )
+ throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.")
+ null.asInstanceOf[IndexedDatasetH2O]
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index c06455a..6d7628e 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -53,4 +53,17 @@ class CheckpointedDrmH2O[K: ClassTag](
def canHaveMissingRows: Boolean = false
protected[mahout] def partitioningTag: Long = h2odrm.frame.anyVec.group.hashCode
+
+ /** stub need to make IndexedDataset core but since drmWrap is not in H2O left for someone else */
+ override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+ throw new UnsupportedOperationException("CheckpointedDrmH2O#newRowCardinality is not implemented.")
+ /* this is the Spark impl
+ assert(n > -1)
+ assert( n >= nrow)
+ val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+ newCheckpointedDrm
+ */
+ this
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index b2fea6b..66309d6 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -172,6 +172,12 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.github.scopt</groupId>
+ <artifactId>scopt_2.10</artifactId>
+ <version>3.2.0</version>
+ </dependency>
<!-- scala stuff -->
<dependency>
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..8c1f8cf
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Extended by a platform specific version of this class to create a Mahout CLI driver.
+ */
+abstract class MahoutDriver {
+
+
+ implicit protected var mc: DistributedContext = _
+ protected var parser: MahoutOptionParser = _
+
+ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+
+ /** Override (optionally) for special cleanup */
+ protected def stop: Unit = {
+ if (!_useExistingContext) mc.close
+ }
+
+ /** This is where you do the work, call start first, then before exiting call stop */
+ protected def process: Unit
+
+ /** Parse command line and call process */
+ def main(args: Array[String]): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..c6968b8
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+import scala.collection.immutable
+
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+ * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+ // build options from some stardard CLI param groups
+ // Note: always put the driver specific options at the last so the can override and previous options!
+ var opts = Map.empty[String, Any]
+
+ override def showUsageOnError = true
+
+ def parseIOOptions(numInputs: Int = 1) = {
+ opts = opts ++ MahoutOptionParser.FileIOOptions
+ note("Input, output options")
+ opt[String]('i', "input") required() action { (x, options) =>
+ options + ("input" -> x)
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+ if (numInputs == 2) {
+ opt[String]("input2") abbr ("i2") action { (x, options) =>
+ options + ("input2" -> x)
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ }
+
+ opt[String]('o', "output") required() action { (x, options) =>
+ // todo: check to see if HDFS allows MS-Windows backslashes locally?
+ if (x.endsWith("/")) {
+ options + ("output" -> x)
+ } else {
+ options + ("output" -> (x + "/"))
+ }
+ } text ("Path for output, any local or HDFS supported URI (required)")
+
+ }
+
+ def parseSparkOptions = {
+ opts = opts ++ MahoutOptionParser.SparkOptions
+ note("\nSpark config options:")
+
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
+ }
+
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
+ }
+
+ }
+
+ def parseGenericOptions = {
+ opts = opts ++ MahoutOptionParser.GenericOptions
+ opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+ options + ("randomSeed" -> x)
+ } validate { x =>
+ if (x > 0) success else failure("Option --randomSeed must be > 0")
+ }
+
+ //output both input DRMs
+ opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+ options + ("writeAllDatasets" -> true)
+ }//Hidden option, though a user might want this.
+ }
+
+ def parseElementInputSchemaOptions{
+ //Input text file schema--not driver specific but input data specific, elements input,
+ // not drms
+ opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
+ note("\nInput text file schema options:")
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+ options + ("inDelim" -> x)
+ }
+
+ opt[String]("filter1") abbr ("f1") action { (x, options) =>
+ options + ("filter1" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+ opt[String]("filter2") abbr ("f2") action { (x, options) =>
+ options + ("filter2" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+ opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
+ options + ("rowIDColumn" -> x)
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ }
+
+ opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
+ options + ("itemIDColumn" -> x)
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ }
+
+ opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
+ options + ("filterColumn" -> x)
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+ }
+
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+ //check for column consistency
+ checkConfig { options: Map[String, Any] =>
+ if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
+ || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
+ || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
+ failure("The row, item, and filter positions must be unique.") else success
+ }
+
+ //check for filter consistency
+ checkConfig { options: Map[String, Any] =>
+ if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+ failure ("If using filters they must be unique.") else success
+ }
+
+ }
+
+ def parseFileDiscoveryOptions = {
+ //File finding strategy--not driver specific
+ opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
+ note("\nFile discovery options:")
+ opt[Unit]('r', "recursive") action { (_, options) =>
+ options + ("recursive" -> true)
+ } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+ opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+ options + ("filenamePattern" -> x)
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+ }
+
+ def parseDrmFormatOptions = {
+ opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+ note("\nOutput text file schema options:")
+ opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+ options + ("rowKeyDelim" -> x)
+ } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+ opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+ options + ("columnIdStrengthDelim" -> x)
+ } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+ opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+ options + ("elementDelim" -> x)
+ } text ("Separates vector element values in the values list (optional). Default: \" \"")
+
+ opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+ options + ("omitStrength" -> true)
+ } text ("Do not write the strength to the output files (optional), Default: false.")
+ note("This option is used to output indexable data for creating a search engine recommender.")
+
+ note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+ }
+
+}
+
+/** Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired */
+object MahoutOptionParser {
+
+ // set up the various default option groups
+ final val GenericOptions = immutable.HashMap[String, Any](
+ "randomSeed" -> System.currentTimeMillis().toInt,
+ "writeAllDatasets" -> false)
+
+ final val SparkOptions = immutable.HashMap[String, Any](
+ "master" -> "local",
+ "sparkExecutorMem" -> "",
+ "appName" -> "Generic Spark App, Change this.")
+
+ final val FileIOOptions = immutable.HashMap[String, Any](
+ "input" -> null.asInstanceOf[String],
+ "input2" -> null.asInstanceOf[String],
+ "output" -> null.asInstanceOf[String])
+
+ final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+ "recursive" -> false,
+ "filenamePattern" -> "^part-.*")
+
+ final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
+ "rowIDColumn" -> 0,
+ "itemIDColumn" -> 1,
+ "filterColumn" -> -1,
+ "filter1" -> null.asInstanceOf[String],
+ "filter2" -> null.asInstanceOf[String],
+ "inDelim" -> "[,\t ]")
+
+ final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitStrength" -> false)
+}
+
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index 90d7559..5718304 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -18,6 +18,7 @@
package org.apache.mahout.math.cf
import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.IndexedDataset
import scalabindings._
import RLikeOps._
import drm._
@@ -49,7 +50,7 @@ object SimilarityAnalysis extends Serializable {
* @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
* @param maxNumInteractions max number of interactions after downsampling, default: 500
* @return
- * */
+ */
def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
@@ -99,13 +100,39 @@ object SimilarityAnalysis extends Serializable {
indicatorMatrices
}
+ /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+ * and returns a list of indicator and cross-indicator matrices
+ * Somewhat easier to use method, which handles the ID dictionaries correctly
+ * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingItemsPerThing max similarities per items
+ * @param maxNumInteractions max number of input items per item
+ * @return
+ */
+ def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
+ randomSeed: Int = 0xdeadbeef,
+ maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500):
+ List[IndexedDataset] = {
+ val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
+ val primaryDrm = drms(0)
+ val secondaryDrms = drms.drop(1)
+ val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
+ maxNumInteractions, secondaryDrms)
+ val retIDSs = coocMatrices.iterator.zipWithIndex.map {
+ case( drm, i ) =>
+ indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
+ }
+ retIDSs.toList
+ }
+
/** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
* @param drmARaw Primary interaction matrix
* @param randomSeed when kept to a constant will make repeatable downsampling
* @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
* @param maxNumInteractions max number of interactions after downsampling, default: 500
* @return
- * */
+ */
def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
maxNumInteractions: Int = 500): DrmLike[Int] = {
@@ -130,10 +157,27 @@ object SimilarityAnalysis extends Serializable {
drmSimilaritiesAAt
}
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- **/
+ /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+ * Uses IndexedDatasets, which handle external ID dictionaries properly
+ * @param indexedDataset compare each row to every other
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+ * @param maxObservationsPerRow max number of input elements to use
+ * @return
+ */
+ def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
+ maxInterestingSimilaritiesPerRow: Int = 50,
+ maxObservationsPerRow: Int = 500):
+ IndexedDataset = {
+ val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
+ maxObservationsPerRow)
+ indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+ */
def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
numInteractionsWithAandB: Long, numInteractions: Long) = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 082e5b9..7f97481 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -41,4 +41,7 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
def keyClassTag: ClassTag[K]
+ /** changes the number of rows without touching the underlying data */
+ def newRowCardinality(n: Int): CheckpointedDrm[K]
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index eaf5aeb..dd5b101 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -17,6 +17,9 @@
package org.apache.mahout.math.drm
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+
import scala.reflect.ClassTag
import logical._
import org.apache.mahout.math._
@@ -85,6 +88,28 @@ trait DistributedEngine {
/** Creates empty DRM with non-trivial height */
def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
(implicit sc: DistributedContext): CheckpointedDrm[Long]
+ /**
+ * Load IndexedDataset from text delimited format.
+ * @param src comma delimited URIs to read from
+ * @param schema defines format of file(s)
+ */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDataset
+
+ /**
+ * Load IndexedDataset from text delimited format, one element per line
+ * @param src comma delimited URIs to read from
+ * @param schema defines format of file(s)
+ */
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetElementReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDataset
+
}
object DistributedEngine {
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index b787ec0..3afbecb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -17,10 +17,13 @@
package org.apache.mahout.math
-import scala.reflect.ClassTag
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema}
+import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+
+import scala.reflect.ClassTag
package object drm {
@@ -114,3 +117,20 @@ package object drm {
}
}
+
+package object indexeddataset {
+ /** Load IndexedDataset from text delimited files */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit ctx: DistributedContext):
+ IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
+
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit ctx: DistributedContext):
+ IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
new file mode 100644
index 0000000..c7eb2cb
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+
+/**
+ * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
+ * ID/label translation dictionaries.
+ * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+ * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
+ * used internal to Mahout core code.
+ *
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+ * to be not created when not needed.
+ */
+
+trait IndexedDataset {
+ val matrix: CheckpointedDrm[Int]
+ val rowIDs: BiMap[String,Int]
+ val columnIDs: BiMap[String,Int]
+
+ /**
+ * Write a text delimited file(s) with the row and column IDs from dictionaries.
+ * @param dest
+ * @param schema
+ */
+ def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
+
+ /** Factory method, creates the extending class */
+ def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+ IndexedDataset
+
+ /**
+ * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+ * No changes are made to the underlying drm.
+ * @param n number to use for new row cardinality, should be larger than current
+ * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
+ * results.
+ */
+ def newRowCardinality(n: Int): IndexedDataset = {
+ // n is validated in matrix
+ this.create(matrix.newRowCardinality(n), rowIDs, columnIDs)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
new file mode 100644
index 0000000..cf429f5
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
+ * which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
+trait Reader[T]{
+
+ val mc: DistributedContext
+ val readSchema: Schema
+
+ protected def elementReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ protected def drmReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ def readElementsFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ elementReader(mc, readSchema, source, existingRowIDs)
+
+ def readDRMFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ drmReader(mc, readSchema, source, existingRowIDs)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write.
+ */
+trait Writer[T]{
+
+ val mc: DistributedContext
+ val sort: Boolean
+ val writeSchema: Schema
+
+ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
new file mode 100644
index 0000000..557b419
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for mutable.HashMap[String, Any]
+ *
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+ // note: this require a mutable HashMap, do we care?
+ this ++= params
+
+ /** Constructor for copying an existing Schema
+ *
+ * @param schemaToClone return a copy of this Schema
+ */
+ def this(schemaToClone: Schema){
+ this()
+ this ++= schemaToClone
+ }
+}
+
+/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+ * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * , which can be used to create a Mahout DRM for DSL ops.
+ */
+
+
+/** Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
+final object DefaultIndexedDatasetElementReadSchema extends Schema(
+ "delim" -> "[,\t ]", //comma, tab or space
+ "filter" -> "",
+ "rowIDColumn" -> 0,
+ "columnIDPosition" -> 1,
+ "filterColumn" -> -1)
+
+/** Default Schema for text delimited drm file output
+ * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
+final object DefaultIndexedDatasetWriteSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> false)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
+ * This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
+final object DefaultIndexedDatasetReadSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ")
+
+/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
+ * the score of any element is ignored,
+ * all non-zeros are replaced with 1.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
+final object IndexedDatasetReadBooleanSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> true)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
+ * the score of a element is omitted.
+ * The presence of a element means the score = 1, the absence means a score of 0.
+ * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
+final object IndexedDatasetWriteBooleanSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> true)
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
new file mode 100644
index 0000000..42bf697
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
+
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+
+ /** Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed. */
+ def uris: String = {
+ if (!filePattern.isEmpty){ // have file pattern so
+ val pathURIs = pathURI.split(",")
+ var files = ""
+ for ( uri <- pathURIs ){
+ files = findFiles(uri, filePattern, files)
+ }
+ if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+ files
+ }else{
+ pathURI
+ }
+ }
+
+ /** Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match */
+ private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+ val seed = fs.getFileStatus(new Path(dir))
+ var f: String = files
+
+ if (seed.isDir) {
+ val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+ for (fileStatus <- fileStatuses) {
+ if (fileStatus.getPath().getName().matches(filePattern)
+ && !fileStatus.isDir) {
+ // found a file
+ if (fileStatus.getLen() != 0) {
+ // file is not empty
+ f = f + fileStatus.getPath.toUri.toString + ","
+ }
+ } else if (fileStatus.isDir && recursive) {
+ f = findFiles(fileStatus.getPath.toString, filePattern, f)
+ }
+ }
+ }else{ f = dir }// was a filename not dir
+ f
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
deleted file mode 100644
index f48e9ed..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
-
-/**
- * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
- * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
- * @param filePattern regex that must match the entire filename to have the file returned
- * @param recursive true traverses the filesystem recursively, default = false
- */
-
-case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
-
- val conf = new Configuration()
- val fs = FileSystem.get(conf)
-
-/** Returns a string of comma delimited URIs matching the filePattern
- * When pattern matching dirs are never returned, only traversed. */
- def uris :String = {
- if (!filePattern.isEmpty){ // have file pattern so
- val pathURIs = pathURI.split(",")
- var files = ""
- for ( uri <- pathURIs ){
- files = findFiles(uri, filePattern, files)
- }
- if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
- files
- }else{
- pathURI
- }
- }
-
-/** Find matching files in the dir, recursively call self when another directory is found
- * Only files are matched, directories are traversed but never return a match */
- private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
- val seed = fs.getFileStatus(new Path(dir))
- var f :String = files
-
- if (seed.isDir) {
- val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
- for (fileStatus <- fileStatuses) {
- if (fileStatus.getPath().getName().matches(filePattern)
- && !fileStatus.isDir) {
- // found a file
- if (fileStatus.getLen() != 0) {
- // file is not empty
- f = f + fileStatus.getPath.toUri.toString + ","
- }
- } else if (fileStatus.isDir && recursive) {
- f = findFiles(fileStatus.getPath.toString, filePattern, f)
- }
- }
- }else{ f = dir }// was a filename not dir
- f
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
deleted file mode 100644
index 99f98f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.apache.mahout.sparkbindings._
-
-/**
- * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
- * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
- * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
- * used internal to Mahout Core code.
- *
- * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
- * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
- * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
- *
- * @param matrix DrmLike[Int], representing the distributed matrix storing the actual data.
- * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
- * and from the ordinal Mahout Int ID. This one holds row labels
- * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
- * ID to and from the ordinal Mahout Int ID. This one holds column labels
- * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
- * to be not created when not needed.
- */
-
-case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
-
- // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
- // learn this afterwards
-
- /**
- * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
- * No physical changes are made to the underlying drm.
- * @param n number to use for row carnindality, should be larger than current
- * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
- * results.
- */
- def newRowCardinality(n: Int): IndexedDataset = {
- assert(n > -1)
- assert( n >= matrix.nrow)
- val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
- val ncol = matrix.ncol
- val newMatrix = drmWrap[Int](drmRdd, n, ncol)
- new IndexedDataset(newMatrix, rowIDs, columnIDs)
- }
-
-}
-
-/**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
- * constructor for
- * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
- * [[org.apache.mahout.drivers.Reader]]
- * {{{
- * val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
- * }}}
- */
-
-object IndexedDataset {
- /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
- def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix, id2.rowIDs, id2.columnIDs)
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 0b8ded6..42e9d81 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,15 +17,19 @@
package org.apache.mahout.drivers
+import org.apache.mahout.common.HDFSPathSearch
import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSReadElements}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
* Reads text lines
* that contain (row id, column id, ...). The IDs are user specified strings which will be
* preserved in the
- * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
* will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
* matrices and calculate both the self similarity of the primary matrix and the row-wise
* similarity of the primary
@@ -40,17 +44,16 @@ import scala.collection.immutable.HashMap
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option.
*/
-object ItemSimilarityDriver extends MahoutDriver {
+object ItemSimilarityDriver extends MahoutSparkDriver {
// define only the options specific to ItemSimilarity
private final val ItemSimilarityOptions = HashMap[String, Any](
"maxPrefs" -> 500,
"maxSimilaritiesPerItem" -> 100,
"appName" -> "ItemSimilarityDriver")
- private var reader1: TextDelimitedIndexedDatasetReader = _
- private var reader2: TextDelimitedIndexedDatasetReader = _
- private var writer: TextDelimitedIndexedDatasetWriter = _
private var writeSchema: Schema = _
+ private var readSchema1: Schema = _
+ private var readSchema2: Schema = _
/**
* @param args Command line args, if empty a help message is printed.
@@ -119,27 +122,27 @@ object ItemSimilarityDriver extends MahoutDriver {
// todo: the HashBiMap used in the TextDelimited Reader is hard coded into
// MahoutKryoRegistrator, it should be added to the register list here so it
- // will be only spcific to this job.
+ // will be only specific to this job.
sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")
- .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
+
+ if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+ sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ //else leave as set in Spark config
super.start(masterUrl, appName)
- val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+ readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
"filter" -> parser.opts("filter1").asInstanceOf[String],
"rowIDColumn" -> parser.opts("rowIDColumn").asInstanceOf[Int],
"columnIDPosition" -> parser.opts("itemIDColumn").asInstanceOf[Int],
"filterColumn" -> parser.opts("filterColumn").asInstanceOf[Int])
- reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
-
if ((parser.opts("filterColumn").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
|| (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
// only need to change the filter used compared to readSchema1
- val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
+ readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
- reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
}
writeSchema = new Schema(
@@ -147,36 +150,34 @@ object ItemSimilarityDriver extends MahoutDriver {
"columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
"omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
"elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-
- writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
-
- }
+ }
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
- else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
Array()
} else {
- val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
- if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
- parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+ val datasetA = indexedDatasetDFSReadElements(inFiles,readSchema1)
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+ datasetA.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions",
+ schema = writeSchema)
- // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+ // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
// Here we assume there is one row ID space for all interactions. To do this we calculate the
// row cardinality only after reading in A and B (or potentially C...) We then adjust the
// cardinality so all match, which is required for the math to work.
// Note: this may leave blank rows with no representation in any DRM. Blank rows need to
- // be supported (and are at least on Spark) or the row cardinality fix will not work.
+ // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
val datasetB = if (!inFiles2.isEmpty) {
// get cross-cooccurrence interactions from separate files
- val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+ val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs)
datasetB
@@ -184,12 +185,12 @@ object ItemSimilarityDriver extends MahoutDriver {
&& parser.opts("filter2").asInstanceOf[String] != null) {
// get cross-cooccurrences interactions by using two filters on a single set of files
- val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+ val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs)
datasetB
} else {
- null.asInstanceOf[IndexedDataset]
+ null.asInstanceOf[IndexedDatasetSpark]
}
if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
// true row cardinality is the size of the row id index, which was calculated from all rows of A and B
@@ -203,8 +204,9 @@ object ItemSimilarityDriver extends MahoutDriver {
val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
else datasetB // this guarantees matching cardinality
- if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
-
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+ datasetB.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/secondary-interactions",
+ schema = writeSchema)
Array(returnedA, returnedB)
} else Array(datasetA)
}
@@ -214,31 +216,13 @@ object ItemSimilarityDriver extends MahoutDriver {
start()
val indexedDatasets = readIndexedDatasets
+ val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
// todo: allow more than one cross-similarity matrix?
- val indicatorMatrices = {
- if (indexedDatasets.length > 1) {
- SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
- parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
- Array(indexedDatasets(1).matrix))
- } else {
- SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
- parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
- }
- }
-
- // an alternative is to create a version of IndexedDataset that knows how to write itself
- val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
- indexedDatasets(0).columnIDs, writeSchema)
- selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
-
- // todo: would be nice to support more than one cross-similarity indicator
- if (indexedDatasets.length > 1) {
-
- val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
- writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
-
- }
+ idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+ if(idss.length > 1)
+ idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
stop
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
deleted file mode 100644
index 6ea7c8b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-
-import scala.collection.immutable
-
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
- * Also define a Map of options for the command line parser. The following template may help:
- * {{{
- * object SomeDriver extends MahoutDriver {
- *
- * // define only the options specific to this driver, inherit the generic ones
- * private final val SomeOptions = HashMap[String, Any](
- * "maxThings" -> 500,
- * "minThings" -> 100,
- * "appName" -> "SomeDriver")
- *
- * override def main(args: Array[String]): Unit = {
- *
- *
- * val parser = new MahoutOptionParser(programName = "shortname") {
- * head("somedriver", "Mahout 1.0-SNAPSHOT")
- *
- * // Input output options, non-driver specific
- * parseIOOptions
- *
- * // Algorithm specific options
- * // Add in the new options
- * opts = opts ++ SomeOptions
- * note("\nAlgorithm control options:")
- * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
- * options + ("maxThings" -> x) ...
- * }
- * parser.parse(args, parser.opts) map { opts =>
- * parser.opts = opts
- * process
- * }
- * }
- *
- * override def process: Unit = {
- * start()
- * // do the work here
- * stop
- * }
- *
- * }}}
- */
-abstract class MahoutDriver {
-
-
- implicit protected var mc: DistributedContext = _
- implicit protected var sparkConf = new SparkConf()
- protected var parser: MahoutOptionParser = _
-
- var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
-
- /** Creates a Spark context to run the job inside.
- * Override to set the SparkConf values specific to the job,
- * these must be set before the context is created.
- * @param masterUrl Spark master URL
- * @param appName Name to display in Spark UI
- * */
- protected def start(masterUrl: String, appName: String) : Unit = {
- if (!_useExistingContext) {
- mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
- }
- }
-
- /** Override (optionally) for special cleanup */
- protected def stop: Unit = {
- if (!_useExistingContext) mc.close
- }
-
- /** This is where you do the work, call start first, then before exiting call stop */
- protected def process: Unit
-
- /** Parse command line and call process */
- def main(args: Array[String]): Unit
-
- def useContext(context: DistributedContext): Unit = {
- _useExistingContext = true
- mc = context
- sparkConf = mc.getConf
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
deleted file mode 100644
index ad7a76b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.drivers
-
-import scopt.OptionParser
-import scala.collection.immutable
-
-/** Companion object defines default option groups for reference in any driver that needs them */
-object MahoutOptionParser {
- // set up the various default option groups
- final val GenericOptions = immutable.HashMap[String, Any](
- "randomSeed" -> System.currentTimeMillis().toInt,
- "writeAllDatasets" -> false)
-
- final val SparkOptions = immutable.HashMap[String, Any](
- "master" -> "local",
- "sparkExecutorMem" -> "2g",
- "appName" -> "Generic Spark App, Change this.")
-
- final val FileIOOptions = immutable.HashMap[String, Any](
- "input" -> null.asInstanceOf[String],
- "input2" -> null.asInstanceOf[String],
- "output" -> null.asInstanceOf[String])
-
- final val FileDiscoveryOptions = immutable.HashMap[String, Any](
- "recursive" -> false,
- "filenamePattern" -> "^part-.*")
-
- final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
- "rowIDColumn" -> 0,
- "itemIDColumn" -> 1,
- "filterColumn" -> -1,
- "filter1" -> null.asInstanceOf[String],
- "filter2" -> null.asInstanceOf[String],
- "inDelim" -> "[,\t ]")
-
- final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ",
- "omitStrength" -> false)
-}
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
- * keep both standarized.
- * @param programName Name displayed in help message, the name by which the driver is invoked.
- * */
-class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
-
- // build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so the can override and previous options!
- var opts = Map.empty[String, Any]
-
- override def showUsageOnError = true
-
- def parseIOOptions(numInputs: Int = 1) = {
- opts = opts ++ MahoutOptionParser.FileIOOptions
- note("Input, output options")
- opt[String]('i', "input") required() action { (x, options) =>
- options + ("input" -> x)
- } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
- if (numInputs == 2) {
- opt[String]("input2") abbr ("i2") action { (x, options) =>
- options + ("input2" -> x)
- } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
- }
-
- opt[String]('o', "output") required() action { (x, options) =>
- // todo: check to see if HDFS allows MS-Windows backslashes locally?
- if (x.endsWith("/")) {
- options + ("output" -> x)
- } else {
- options + ("output" -> (x + "/"))
- }
- } text ("Path for output, any local or HDFS supported URI (required)")
-
- }
-
- def parseSparkOptions = {
- opts = opts ++ MahoutOptionParser.SparkOptions
- note("\nSpark config options:")
-
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options + ("master" -> x)
- }
-
- opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
- options + ("sparkExecutorMem" -> x)
- }
-
- }
-
- def parseGenericOptions = {
- opts = opts ++ MahoutOptionParser.GenericOptions
- opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
- options + ("randomSeed" -> x)
- } validate { x =>
- if (x > 0) success else failure("Option --randomSeed must be > 0")
- }
-
- //output both input DRMs
- opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
- options + ("writeAllDatasets" -> true)
- }//Hidden option, though a user might want this.
- }
-
- def parseElementInputSchemaOptions{
- //Input text file schema--not driver specific but input data specific, elements input,
- // not drms
- opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
- note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
- options + ("inDelim" -> x)
- }
-
- opt[String]("filter1") abbr ("f1") action { (x, options) =>
- options + ("filter1" -> x)
- } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
- opt[String]("filter2") abbr ("f2") action { (x, options) =>
- options + ("filter2" -> x)
- } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
-
- opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
- options + ("rowIDColumn" -> x)
- } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
- if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
- }
-
- opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
- options + ("itemIDColumn" -> x)
- } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
- if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
- }
-
- opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
- options + ("filterColumn" -> x)
- } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
- if (x >= -1) success else failure("Option --filterColNum must be >= -1")
- }
-
- note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
-
- checkConfig { options: Map[String, Any] =>
- if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
- || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
- || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
- failure("The row, item, and filter positions must be unique.") else success
- }
-
- //check for option consistency, probably driver specific
- checkConfig { options: Map[String, Any] =>
- if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
- && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
- && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
- failure ("If using filters they must be unique.") else success
- }
-
- }
-
- def parseFileDiscoveryOptions = {
- //File finding strategy--not driver specific
- opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
- note("\nFile discovery options:")
- opt[Unit]('r', "recursive") action { (_, options) =>
- options + ("recursive" -> true)
- } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
-
- opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
- options + ("filenamePattern" -> x)
- } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
-
- }
-
- def parseDrmFormatOptions = {
- opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
- note("\nOutput text file schema options:")
- opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
- options + ("rowKeyDelim" -> x)
- } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
- opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
- options + ("columnIdStrengthDelim" -> x)
- } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
- opt[String]("elementDelim") abbr ("td") action { (x, options) =>
- options + ("elementDelim" -> x)
- } text ("Separates vector element values in the values list (optional). Default: \" \"")
-
- opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
- options + ("omitStrength" -> true)
- } text ("Do not write the strength to the output files (optional), Default: false.")
- note("This option is used to output indexable data for creating a search engine recommender.")
-
- note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
new file mode 100644
index 0000000..5a4254b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ * // define only the options specific to this driver, inherit the generic ones
+ * private final val SomeOptions = HashMap[String, Any](
+ * "maxThings" -> 500,
+ * "minThings" -> 100,
+ * "appName" -> "SomeDriver")
+ *
+ * override def main(args: Array[String]): Unit = {
+ *
+ *
+ * val parser = new MahoutOptionParser(programName = "shortname") {
+ * head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ * // Input output options, non-driver specific
+ * parseIOOptions
+ *
+ * // Algorithm specific options
+ * // Add in the new options
+ * opts = opts ++ SomeOptions
+ * note("\nAlgorithm control options:")
+ * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ * options + ("maxThings" -> x) ...
+ * }
+ * parser.parse(args, parser.opts) map { opts =>
+ * parser.opts = opts
+ * process
+ * }
+ * }
+ *
+ * override def process: Unit = {
+ * start()
+ * // do the work here
+ * stop
+ * }
+ *
+ * }}}
+ */
+abstract class MahoutSparkDriver extends MahoutDriver {
+
+
+ implicit protected var sparkConf = new SparkConf()
+
+ /** Creates a Spark context to run the job inside.
+ * Override to set the SparkConf values specific to the job,
+ * these must be set before the context is created.
+ * @param masterUrl Spark master URL
+ * @param appName Name to display in Spark UI
+ * */
+ protected def start(masterUrl: String, appName: String) : Unit = {
+ if (!_useExistingContext) {
+ mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+ }
+ }
+
+ def useContext(context: DistributedContext): Unit = {
+ _useExistingContext = true
+ mc = context
+ sparkConf = mc.getConf
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
deleted file mode 100644
index 6351e45..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.{HashBiMap, BiMap}
-import org.apache.mahout.math.drm.DistributedContext
-
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
- * @tparam T type of object read.
- */
-trait Reader[T]{
-
- val mc: DistributedContext
- val readSchema: Schema
-
- protected def elementReader(
- mc: DistributedContext,
- readSchema: Schema,
- source: String,
- existingRowIDs: BiMap[String, Int]): T
-
- protected def drmReader(
- mc: DistributedContext,
- readSchema: Schema,
- source: String,
- existingRowIDs: BiMap[String, Int]): T
-
- def readElementsFrom(
- source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- elementReader(mc, readSchema, source, existingRowIDs)
-
- def readDRMFrom(
- source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- drmReader(mc, readSchema, source, existingRowIDs)
-}
-
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
- * @tparam T type of object to write.
- */
-trait Writer[T]{
-
- val mc: DistributedContext
- val sort: Boolean
- val writeSchema: Schema
-
- protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
-
- def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
-}