You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/10/14 23:46:00 UTC
[2/2] git commit: NOJIRA refactor IndexedDataset and CLI drivers into
core and engine specific parts, closes apache/mahout#59
NOJIRA refactor IndexedDataset and CLI drivers into core and engine specific parts, closes apache/mahout#59
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/666d314f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/666d314f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/666d314f
Branch: refs/heads/master
Commit: 666d314fb8d9f466a5496a143d73d55036aea619
Parents: 4e6577d
Author: pferrel <pa...@occamsmachete.com>
Authored: Tue Oct 14 14:45:32 2014 -0700
Committer: pferrel <pa...@occamsmachete.com>
Committed: Tue Oct 14 14:45:32 2014 -0700
----------------------------------------------------------------------
.../apache/mahout/h2obindings/H2OEngine.scala | 44 +++
.../h2obindings/drm/CheckpointedDrmH2O.scala | 13 +
math-scala/pom.xml | 6 +
.../apache/mahout/drivers/MahoutDriver.scala | 43 +++
.../mahout/drivers/MahoutOptionParser.scala | 220 +++++++++++++++
.../mahout/math/cf/SimilarityAnalysis.scala | 56 +++-
.../mahout/math/drm/CheckpointedDrm.scala | 3 +
.../mahout/math/drm/DistributedEngine.scala | 25 ++
.../org/apache/mahout/math/drm/package.scala | 26 +-
.../math/indexeddataset/IndexedDataset.scala | 64 +++++
.../math/indexeddataset/ReaderWriter.scala | 68 +++++
.../mahout/math/indexeddataset/Schema.scala | 102 +++++++
.../apache/mahout/common/HDFSPathSearch.scala | 78 ++++++
.../apache/mahout/drivers/FileSysUtils.scala | 76 ------
.../apache/mahout/drivers/IndexedDataset.scala | 80 ------
.../mahout/drivers/ItemSimilarityDriver.scala | 90 +++----
.../apache/mahout/drivers/MahoutDriver.scala | 104 -------
.../mahout/drivers/MahoutOptionParser.scala | 214 ---------------
.../mahout/drivers/MahoutSparkDriver.scala | 87 ++++++
.../apache/mahout/drivers/ReaderWriter.scala | 66 -----
.../mahout/drivers/RowSimilarityDriver.scala | 41 +--
.../org/apache/mahout/drivers/Schema.scala | 98 -------
.../drivers/TextDelimitedReaderWriter.scala | 68 ++---
.../mahout/sparkbindings/SparkEngine.scala | 38 ++-
.../drm/CheckpointedDrmSpark.scala | 12 +
.../indexeddataset/IndexedDatasetSpark.scala | 53 ++++
.../apache/mahout/sparkbindings/package.scala | 4 +-
.../mahout/cf/CooccurrenceAnalysisSuite.scala | 267 ------------------
.../mahout/cf/SimilarityAnalysisSuite.scala | 269 +++++++++++++++++++
29 files changed, 1273 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 28214c6..99bc3ba 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -17,6 +17,9 @@
package org.apache.mahout.h2obindings
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, Schema, DefaultIndexedDatasetReadSchema}
+
import scala.reflect._
import org.apache.mahout.math._
import org.apache.mahout.math.drm._
@@ -112,4 +115,45 @@ object H2OEngine extends DistributedEngine {
}
implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]]
+
+ /** stub class not implemented in H2O */
+ abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int])
+ extends IndexedDataset {}
+
+ /**
+ * reads an IndexedDatasetH2O from default text delimited files
+ * @todo unimplemented
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetH2O = {
+ // should log a warning when this is built but no logger here, can an H2O contributor help with this
+ println("Warning: unimplemented indexedDatasetDFSReadElements." )
+ throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.")
+ null.asInstanceOf[IndexedDatasetH2O]
+ }
+
+ /**
+ * reads an IndexedDatasetH2O from default text delimited files
+ * @todo unimplemented
+ * @param src a comma separated list of URIs to read from
+ * @param schema how the text file is formatted
+ * @return
+ */
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDatasetH2O = {
+ // should log a warning when this is built but no logger here, can an H2O contributor help with this
+ println("Warning: unimplemented indexedDatasetDFSReadElements." )
+ throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.")
+ null.asInstanceOf[IndexedDatasetH2O]
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index c06455a..6d7628e 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -53,4 +53,17 @@ class CheckpointedDrmH2O[K: ClassTag](
def canHaveMissingRows: Boolean = false
protected[mahout] def partitioningTag: Long = h2odrm.frame.anyVec.group.hashCode
+
+ /** stub need to make IndexedDataset core but since drmWrap is not in H2O left for someone else */
+ override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+ throw new UnsupportedOperationException("CheckpointedDrmH2O#newRowCardinality is not implemented.")
+ /* this is the Spark impl
+ assert(n > -1)
+ assert( n >= nrow)
+ val newCheckpointedDrm = drmWrap[K](rdd, n, ncol)
+ newCheckpointedDrm
+ */
+ this
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index b2fea6b..66309d6 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -172,6 +172,12 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.github.scopt</groupId>
+ <artifactId>scopt_2.10</artifactId>
+ <version>3.2.0</version>
+ </dependency>
<!-- scala stuff -->
<dependency>
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..8c1f8cf
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Extended by a platform specific version of this class to create a Mahout CLI driver.
+ */
+abstract class MahoutDriver {
+
+
+ implicit protected var mc: DistributedContext = _
+ protected var parser: MahoutOptionParser = _
+
+ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
+
+ /** Override (optionally) for special cleanup */
+ protected def stop: Unit = {
+ if (!_useExistingContext) mc.close
+ }
+
+ /** This is where you do the work, call start first, then before exiting call stop */
+ protected def process: Unit
+
+ /** Parse command line and call process */
+ def main(args: Array[String]): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..c6968b8
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+import scala.collection.immutable
+
+/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
+ * keep both standarized.
+ * @param programName Name displayed in help message, the name by which the driver is invoked.
+ * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+ * */
+class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
+
+ // build options from some stardard CLI param groups
+ // Note: always put the driver specific options at the last so the can override and previous options!
+ var opts = Map.empty[String, Any]
+
+ override def showUsageOnError = true
+
+ def parseIOOptions(numInputs: Int = 1) = {
+ opts = opts ++ MahoutOptionParser.FileIOOptions
+ note("Input, output options")
+ opt[String]('i', "input") required() action { (x, options) =>
+ options + ("input" -> x)
+ } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
+
+ if (numInputs == 2) {
+ opt[String]("input2") abbr ("i2") action { (x, options) =>
+ options + ("input2" -> x)
+ } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
+ }
+
+ opt[String]('o', "output") required() action { (x, options) =>
+ // todo: check to see if HDFS allows MS-Windows backslashes locally?
+ if (x.endsWith("/")) {
+ options + ("output" -> x)
+ } else {
+ options + ("output" -> (x + "/"))
+ }
+ } text ("Path for output, any local or HDFS supported URI (required)")
+
+ }
+
+ def parseSparkOptions = {
+ opts = opts ++ MahoutOptionParser.SparkOptions
+ note("\nSpark config options:")
+
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
+ }
+
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
+ }
+
+ }
+
+ def parseGenericOptions = {
+ opts = opts ++ MahoutOptionParser.GenericOptions
+ opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+ options + ("randomSeed" -> x)
+ } validate { x =>
+ if (x > 0) success else failure("Option --randomSeed must be > 0")
+ }
+
+ //output both input DRMs
+ opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
+ options + ("writeAllDatasets" -> true)
+ }//Hidden option, though a user might want this.
+ }
+
+ def parseElementInputSchemaOptions{
+ //Input text file schema--not driver specific but input data specific, elements input,
+ // not drms
+ opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
+ note("\nInput text file schema options:")
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+ options + ("inDelim" -> x)
+ }
+
+ opt[String]("filter1") abbr ("f1") action { (x, options) =>
+ options + ("filter1" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
+
+ opt[String]("filter2") abbr ("f2") action { (x, options) =>
+ options + ("filter2" -> x)
+ } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
+
+ opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
+ options + ("rowIDColumn" -> x)
+ } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
+ if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+ }
+
+ opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
+ options + ("itemIDColumn" -> x)
+ } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
+ if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+ }
+
+ opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
+ options + ("filterColumn" -> x)
+ } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
+ if (x >= -1) success else failure("Option --filterColNum must be >= -1")
+ }
+
+ note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
+
+ //check for column consistency
+ checkConfig { options: Map[String, Any] =>
+ if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
+ || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
+ || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
+ failure("The row, item, and filter positions must be unique.") else success
+ }
+
+ //check for filter consistency
+ checkConfig { options: Map[String, Any] =>
+ if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
+ && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
+ failure ("If using filters they must be unique.") else success
+ }
+
+ }
+
+ def parseFileDiscoveryOptions = {
+ //File finding strategy--not driver specific
+ opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
+ note("\nFile discovery options:")
+ opt[Unit]('r', "recursive") action { (_, options) =>
+ options + ("recursive" -> true)
+ } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
+
+ opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+ options + ("filenamePattern" -> x)
+ } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
+
+ }
+
+ def parseDrmFormatOptions = {
+ opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
+ note("\nOutput text file schema options:")
+ opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
+ options + ("rowKeyDelim" -> x)
+ } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
+
+ opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
+ options + ("columnIdStrengthDelim" -> x)
+ } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
+
+ opt[String]("elementDelim") abbr ("td") action { (x, options) =>
+ options + ("elementDelim" -> x)
+ } text ("Separates vector element values in the values list (optional). Default: \" \"")
+
+ opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+ options + ("omitStrength" -> true)
+ } text ("Do not write the strength to the output files (optional), Default: false.")
+ note("This option is used to output indexable data for creating a search engine recommender.")
+
+ note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
+ }
+
+}
+
+/** Companion object defines default option groups for reference in any driver that needs them.
+ * @note not all options are platform neutral so other platforms can add default options here if desired */
+object MahoutOptionParser {
+
+ // set up the various default option groups
+ final val GenericOptions = immutable.HashMap[String, Any](
+ "randomSeed" -> System.currentTimeMillis().toInt,
+ "writeAllDatasets" -> false)
+
+ final val SparkOptions = immutable.HashMap[String, Any](
+ "master" -> "local",
+ "sparkExecutorMem" -> "",
+ "appName" -> "Generic Spark App, Change this.")
+
+ final val FileIOOptions = immutable.HashMap[String, Any](
+ "input" -> null.asInstanceOf[String],
+ "input2" -> null.asInstanceOf[String],
+ "output" -> null.asInstanceOf[String])
+
+ final val FileDiscoveryOptions = immutable.HashMap[String, Any](
+ "recursive" -> false,
+ "filenamePattern" -> "^part-.*")
+
+ final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
+ "rowIDColumn" -> 0,
+ "itemIDColumn" -> 1,
+ "filterColumn" -> -1,
+ "filter1" -> null.asInstanceOf[String],
+ "filter2" -> null.asInstanceOf[String],
+ "inDelim" -> "[,\t ]")
+
+ final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitStrength" -> false)
+}
+
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index 90d7559..5718304 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -18,6 +18,7 @@
package org.apache.mahout.math.cf
import org.apache.mahout.math._
+import org.apache.mahout.math.indexeddataset.IndexedDataset
import scalabindings._
import RLikeOps._
import drm._
@@ -49,7 +50,7 @@ object SimilarityAnalysis extends Serializable {
* @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
* @param maxNumInteractions max number of interactions after downsampling, default: 500
* @return
- * */
+ */
def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
@@ -99,13 +100,39 @@ object SimilarityAnalysis extends Serializable {
indicatorMatrices
}
+ /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
+ * and returns a list of indicator and cross-indicator matrices
+ * Somewhat easier to use method, which handles the ID dictionaries correctly
+ * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingItemsPerThing max similarities per items
+ * @param maxNumInteractions max number of input items per item
+ * @return
+ */
+ def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset],
+ randomSeed: Int = 0xdeadbeef,
+ maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500):
+ List[IndexedDataset] = {
+ val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
+ val primaryDrm = drms(0)
+ val secondaryDrms = drms.drop(1)
+ val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
+ maxNumInteractions, secondaryDrms)
+ val retIDSs = coocMatrices.iterator.zipWithIndex.map {
+ case( drm, i ) =>
+ indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
+ }
+ retIDSs.toList
+ }
+
/** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows
* @param drmARaw Primary interaction matrix
* @param randomSeed when kept to a constant will make repeatable downsampling
* @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
* @param maxNumInteractions max number of interactions after downsampling, default: 500
* @return
- * */
+ */
def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50,
maxNumInteractions: Int = 500): DrmLike[Int] = {
@@ -130,10 +157,27 @@ object SimilarityAnalysis extends Serializable {
drmSimilaritiesAAt
}
- /**
- * Compute loglikelihood ratio
- * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
- **/
+ /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
+ * Uses IndexedDatasets, which handle external ID dictionaries properly
+ * @param indexedDataset compare each row to every other
+ * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
+ * @param maxInterestingSimilaritiesPerRow max elements returned in each row
+ * @param maxObservationsPerRow max number of input elements to use
+ * @return
+ */
+ def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
+ maxInterestingSimilaritiesPerRow: Int = 50,
+ maxObservationsPerRow: Int = 500):
+ IndexedDataset = {
+ val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
+ maxObservationsPerRow)
+ indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
+ */
def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
numInteractionsWithAandB: Long, numInteractions: Long) = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 082e5b9..7f97481 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -41,4 +41,7 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
def keyClassTag: ClassTag[K]
+ /** changes the number of rows without touching the underlying data */
+ def newRowCardinality(n: Int): CheckpointedDrm[K]
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index eaf5aeb..dd5b101 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -17,6 +17,9 @@
package org.apache.mahout.math.drm
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema, IndexedDataset}
+
import scala.reflect.ClassTag
import logical._
import org.apache.mahout.math._
@@ -85,6 +88,28 @@ trait DistributedEngine {
/** Creates empty DRM with non-trivial height */
def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
(implicit sc: DistributedContext): CheckpointedDrm[Long]
+ /**
+ * Load IndexedDataset from text delimited format.
+ * @param src comma delimited URIs to read from
+ * @param schema defines format of file(s)
+ */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDataset
+
+ /**
+ * Load IndexedDataset from text delimited format, one element per line
+ * @param src comma delimited URIs to read from
+ * @param schema defines format of file(s)
+ */
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetElementReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit sc: DistributedContext):
+ IndexedDataset
+
}
object DistributedEngine {
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index b787ec0..3afbecb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -17,10 +17,13 @@
package org.apache.mahout.math
-import scala.reflect.ClassTag
+import com.google.common.collect.{HashBiMap, BiMap}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema}
+import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.decompositions.{DSSVD, DSPCA, DQR}
+
+import scala.reflect.ClassTag
package object drm {
@@ -114,3 +117,20 @@ package object drm {
}
}
+
+package object indexeddataset {
+ /** Load IndexedDataset from text delimited files */
+ def indexedDatasetDFSRead(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit ctx: DistributedContext):
+ IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs)
+
+ def indexedDatasetDFSReadElements(src: String,
+ schema: Schema = DefaultIndexedDatasetReadSchema,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create())
+ (implicit ctx: DistributedContext):
+ IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs)
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
new file mode 100644
index 0000000..c7eb2cb
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm}
+import org.apache.mahout.math.indexeddataset
+
+/**
+ * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store
+ * ID/label translation dictionaries.
+ * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
+ * a user specified label or ID can be stored and mapped to and from the Mahout Int ID
+ * used internal to Mahout core code.
+ *
+ * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
+ * to be not created when not needed.
+ */
+
+trait IndexedDataset {
+ val matrix: CheckpointedDrm[Int]
+ val rowIDs: BiMap[String,Int]
+ val columnIDs: BiMap[String,Int]
+
+ /**
+ * Write a text delimited file(s) with the row and column IDs from dictionaries.
+ * @param dest
+ * @param schema
+ */
+ def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit
+
+ /** Factory method, creates the extending class */
+ def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]):
+ IndexedDataset
+
+ /**
+ * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
+ * No changes are made to the underlying drm.
+ * @param n number to use for new row cardinality, should be larger than current
+ * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable
+ * results.
+ */
+ def newRowCardinality(n: Int): IndexedDataset = {
+ // n is validated in matrix
+ this.create(matrix.newRowCardinality(n), rowIDs, columnIDs)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
new file mode 100644
index 0000000..cf429f5
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import com.google.common.collect.{BiMap, HashBiMap}
+import org.apache.mahout.math.drm.DistributedContext
+
+/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait,
+ * which also defines the type to be read.
+ * @tparam T type of object to read.
+ */
+trait Reader[T]{
+
+ val mc: DistributedContext
+ val readSchema: Schema
+
+ protected def elementReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ protected def drmReader(
+ mc: DistributedContext,
+ readSchema: Schema,
+ source: String,
+ existingRowIDs: BiMap[String, Int]): T
+
+ def readElementsFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ elementReader(mc, readSchema, source, existingRowIDs)
+
+ def readDRMFrom(
+ source: String,
+ existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
+ drmReader(mc, readSchema, source, existingRowIDs)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait,
+ * which also defines the type to be written.
+ * @tparam T type of object to write.
+ */
+trait Writer[T]{
+
+ val mc: DistributedContext
+ val sort: Boolean
+ val writeSchema: Schema
+
+ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
+
+ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
new file mode 100644
index 0000000..557b419
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.indexeddataset
+
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for mutable.HashMap[String, Any]
+ *
+ * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}}
+ */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+ // note: this require a mutable HashMap, do we care?
+ this ++= params
+
+ /** Constructor for copying an existing Schema
+ *
+ * @param schemaToClone return a copy of this Schema
+ */
+ def this(schemaToClone: Schema){
+ this()
+ this ++= schemaToClone
+ }
+}
+
+/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific
+ * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * , which can be used to create a Mahout DRM for DSL ops.
+ */
+
+
+/** Simple default Schema for typical text delimited element file input
+ * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID
+ * <comma, tab, or space>here may be other ignored text...)
+ */
+final object DefaultIndexedDatasetElementReadSchema extends Schema(
+ "delim" -> "[,\t ]", //comma, tab or space
+ "filter" -> "",
+ "rowIDColumn" -> 0,
+ "columnIDPosition" -> 1,
+ "filterColumn" -> -1)
+
+/** Default Schema for text delimited drm file output
+ * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form:
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...)
+ */
+final object DefaultIndexedDatasetWriteSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> false)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input
+ * This tells the reader to input text lines of the form:
+ * (rowID<tab>columnID1:score1,columnID2:score2,...)
+ */
+final object DefaultIndexedDatasetReadSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ")
+
+/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where
+ * the score of any element is ignored,
+ * all non-zeros are replaced with 1.
+ * This tells the reader to input DRM lines of the form
+ * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored.
+ * Alternatively the format can be
+ * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default
+ * output format for [[IndexedDatasetWriteBooleanSchema]]
+ */
+final object IndexedDatasetReadBooleanSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> true)
+
+/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where
+ * the score of a element is omitted.
+ * The presence of a element means the score = 1, the absence means a score of 0.
+ * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form
+ * (rowID<tab>columnID1<space>columnID2...)
+ */
+final object IndexedDatasetWriteBooleanSchema extends Schema(
+ "rowKeyDelim" -> "\t",
+ "columnIdStrengthDelim" -> ":",
+ "elementDelim" -> " ",
+ "omitScore" -> true)
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
new file mode 100644
index 0000000..42bf697
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+/**
+ * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters
+ * in the constructor.
+ * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
+ *
+ * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
+ * @param filePattern regex that must match the entire filename to have the file returned
+ * @param recursive true traverses the filesystem recursively, default = false
+ */
+
+case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
+
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+
+ /** Returns a string of comma delimited URIs matching the filePattern
+ * When pattern matching dirs are never returned, only traversed. */
+ def uris: String = {
+ if (!filePattern.isEmpty){ // have file pattern so
+ val pathURIs = pathURI.split(",")
+ var files = ""
+ for ( uri <- pathURIs ){
+ files = findFiles(uri, filePattern, files)
+ }
+ if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
+ files
+ }else{
+ pathURI
+ }
+ }
+
+ /** Find matching files in the dir, recursively call self when another directory is found
+ * Only files are matched, directories are traversed but never return a match */
+ private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = {
+ val seed = fs.getFileStatus(new Path(dir))
+ var f: String = files
+
+ if (seed.isDir) {
+ val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
+ for (fileStatus <- fileStatuses) {
+ if (fileStatus.getPath().getName().matches(filePattern)
+ && !fileStatus.isDir) {
+ // found a file
+ if (fileStatus.getLen() != 0) {
+ // file is not empty
+ f = f + fileStatus.getPath.toUri.toString + ","
+ }
+ } else if (fileStatus.isDir && recursive) {
+ f = findFiles(fileStatus.getPath.toString, filePattern, f)
+ }
+ }
+ }else{ f = dir }// was a filename not dir
+ f
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
deleted file mode 100644
index f48e9ed..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
-
-/**
- * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor.
- * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]]
- *
- * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs
- * @param filePattern regex that must match the entire filename to have the file returned
- * @param recursive true traverses the filesystem recursively, default = false
- */
-
-case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) {
-
- val conf = new Configuration()
- val fs = FileSystem.get(conf)
-
-/** Returns a string of comma delimited URIs matching the filePattern
- * When pattern matching dirs are never returned, only traversed. */
- def uris :String = {
- if (!filePattern.isEmpty){ // have file pattern so
- val pathURIs = pathURI.split(",")
- var files = ""
- for ( uri <- pathURIs ){
- files = findFiles(uri, filePattern, files)
- }
- if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma
- files
- }else{
- pathURI
- }
- }
-
-/** Find matching files in the dir, recursively call self when another directory is found
- * Only files are matched, directories are traversed but never return a match */
- private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = {
- val seed = fs.getFileStatus(new Path(dir))
- var f :String = files
-
- if (seed.isDir) {
- val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
- for (fileStatus <- fileStatuses) {
- if (fileStatus.getPath().getName().matches(filePattern)
- && !fileStatus.isDir) {
- // found a file
- if (fileStatus.getLen() != 0) {
- // file is not empty
- f = f + fileStatus.getPath.toUri.toString + ","
- }
- } else if (fileStatus.isDir && recursive) {
- f = findFiles(fileStatus.getPath.toString, filePattern, f)
- }
- }
- }else{ f = dir }// was a filename not dir
- f
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
deleted file mode 100644
index 99f98f5..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.BiMap
-import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm}
-import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark
-import org.apache.mahout.sparkbindings._
-
-/**
- * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
- * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so
- * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID
- * used internal to Mahout Core code.
- *
- * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
- * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new
- * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
- *
- * @param matrix DrmLike[Int], representing the distributed matrix storing the actual data.
- * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to
- * and from the ordinal Mahout Int ID. This one holds row labels
- * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String
- * ID to and from the ordinal Mahout Int ID. This one holds column labels
- * @todo Often no need for both or perhaps either dictionary, so save resources by allowing
- * to be not created when not needed.
- */
-
-case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) {
-
- // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we
- // learn this afterwards
-
- /**
- * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value.
- * No physical changes are made to the underlying drm.
- * @param n number to use for row carnindality, should be larger than current
- * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable
- * results.
- */
- def newRowCardinality(n: Int): IndexedDataset = {
- assert(n > -1)
- assert( n >= matrix.nrow)
- val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd
- val ncol = matrix.ncol
- val newMatrix = drmWrap[Int](drmRdd, n, ncol)
- new IndexedDataset(newMatrix, rowIDs, columnIDs)
- }
-
-}
-
-/**
- * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary
- * constructor for
- * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like
- * [[org.apache.mahout.drivers.Reader]]
- * {{{
- * val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source))
- * }}}
- */
-
-object IndexedDataset {
- /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
- def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix, id2.rowIDs, id2.columnIDs)
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 0b8ded6..42e9d81 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -17,15 +17,19 @@
package org.apache.mahout.drivers
+import org.apache.mahout.common.HDFSPathSearch
import org.apache.mahout.math.cf.SimilarityAnalysis
+import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSReadElements}
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]].
+ * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
* Reads text lines
* that contain (row id, column id, ...). The IDs are user specified strings which will be
* preserved in the
- * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]
+ * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
+ * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
* will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
* matrices and calculate both the self similarity of the primary matrix and the row-wise
* similarity of the primary
@@ -40,17 +44,16 @@ import scala.collection.immutable.HashMap
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
* the --sparkExecutorMemory option.
*/
-object ItemSimilarityDriver extends MahoutDriver {
+object ItemSimilarityDriver extends MahoutSparkDriver {
// define only the options specific to ItemSimilarity
private final val ItemSimilarityOptions = HashMap[String, Any](
"maxPrefs" -> 500,
"maxSimilaritiesPerItem" -> 100,
"appName" -> "ItemSimilarityDriver")
- private var reader1: TextDelimitedIndexedDatasetReader = _
- private var reader2: TextDelimitedIndexedDatasetReader = _
- private var writer: TextDelimitedIndexedDatasetWriter = _
private var writeSchema: Schema = _
+ private var readSchema1: Schema = _
+ private var readSchema2: Schema = _
/**
* @param args Command line args, if empty a help message is printed.
@@ -119,27 +122,27 @@ object ItemSimilarityDriver extends MahoutDriver {
// todo: the HashBiMap used in the TextDelimited Reader is hard coded into
// MahoutKryoRegistrator, it should be added to the register list here so it
- // will be only spcific to this job.
+ // will be only specific to this job.
sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")
- .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
+
+ if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
+ sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
+ //else leave as set in Spark config
super.start(masterUrl, appName)
- val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
+ readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String],
"filter" -> parser.opts("filter1").asInstanceOf[String],
"rowIDColumn" -> parser.opts("rowIDColumn").asInstanceOf[Int],
"columnIDPosition" -> parser.opts("itemIDColumn").asInstanceOf[Int],
"filterColumn" -> parser.opts("filterColumn").asInstanceOf[Int])
- reader1 = new TextDelimitedIndexedDatasetReader(readSchema1)
-
if ((parser.opts("filterColumn").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null)
|| (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){
// only need to change the filter used compared to readSchema1
- val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
+ readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String])
- reader2 = new TextDelimitedIndexedDatasetReader(readSchema2)
}
writeSchema = new Schema(
@@ -147,36 +150,34 @@ object ItemSimilarityDriver extends MahoutDriver {
"columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String],
"omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean],
"elementDelim" -> parser.opts("elementDelim").asInstanceOf[String])
-
- writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
-
- }
+ }
private def readIndexedDatasets: Array[IndexedDataset] = {
- val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) ""
- else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
+ else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String],
parser.opts("recursive").asInstanceOf[Boolean]).uris
if (inFiles.isEmpty) {
Array()
} else {
- val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles))
- if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA,
- parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions")
+ val datasetA = indexedDatasetDFSReadElements(inFiles,readSchema1)
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+ datasetA.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions",
+ schema = writeSchema)
- // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B
+ // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B
// Here we assume there is one row ID space for all interactions. To do this we calculate the
// row cardinality only after reading in A and B (or potentially C...) We then adjust the
// cardinality so all match, which is required for the math to work.
// Note: this may leave blank rows with no representation in any DRM. Blank rows need to
- // be supported (and are at least on Spark) or the row cardinality fix will not work.
+ // be supported (and are at least on Spark) or the row cardinality adjustment will not work.
val datasetB = if (!inFiles2.isEmpty) {
// get cross-cooccurrence interactions from separate files
- val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs))
+ val datasetB = indexedDatasetDFSReadElements(inFiles2, readSchema2, existingRowIDs = datasetA.rowIDs)
datasetB
@@ -184,12 +185,12 @@ object ItemSimilarityDriver extends MahoutDriver {
&& parser.opts("filter2").asInstanceOf[String] != null) {
// get cross-cooccurrences interactions by using two filters on a single set of files
- val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs))
+ val datasetB = indexedDatasetDFSReadElements(inFiles, readSchema2, existingRowIDs = datasetA.rowIDs)
datasetB
} else {
- null.asInstanceOf[IndexedDataset]
+ null.asInstanceOf[IndexedDatasetSpark]
}
if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc
// true row cardinality is the size of the row id index, which was calculated from all rows of A and B
@@ -203,8 +204,9 @@ object ItemSimilarityDriver extends MahoutDriver {
val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality)
else datasetB // this guarantees matching cardinality
- if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions")
-
+ if (parser.opts("writeAllDatasets").asInstanceOf[Boolean])
+ datasetB.dfsWrite(parser.opts("output").asInstanceOf[String] + "../input-datasets/secondary-interactions",
+ schema = writeSchema)
Array(returnedA, returnedB)
} else Array(datasetA)
}
@@ -214,31 +216,13 @@ object ItemSimilarityDriver extends MahoutDriver {
start()
val indexedDatasets = readIndexedDatasets
+ val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int],
+ parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
// todo: allow more than one cross-similarity matrix?
- val indicatorMatrices = {
- if (indexedDatasets.length > 1) {
- SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
- parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int],
- Array(indexedDatasets(1).matrix))
- } else {
- SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int],
- parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int])
- }
- }
-
- // an alternative is to create a version of IndexedDataset that knows how to write itself
- val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs,
- indexedDatasets(0).columnIDs, writeSchema)
- selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix")
-
- // todo: would be nice to support more than one cross-similarity indicator
- if (indexedDatasets.length > 1) {
-
- val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
- writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix")
-
- }
+ idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema)
+ if(idss.length > 1)
+ idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema)
stop
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
deleted file mode 100644
index 6ea7c8b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.spark.SparkConf
-import org.apache.mahout.sparkbindings._
-
-import scala.collection.immutable
-
-/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
- * Also define a Map of options for the command line parser. The following template may help:
- * {{{
- * object SomeDriver extends MahoutDriver {
- *
- * // define only the options specific to this driver, inherit the generic ones
- * private final val SomeOptions = HashMap[String, Any](
- * "maxThings" -> 500,
- * "minThings" -> 100,
- * "appName" -> "SomeDriver")
- *
- * override def main(args: Array[String]): Unit = {
- *
- *
- * val parser = new MahoutOptionParser(programName = "shortname") {
- * head("somedriver", "Mahout 1.0-SNAPSHOT")
- *
- * // Input output options, non-driver specific
- * parseIOOptions
- *
- * // Algorithm specific options
- * // Add in the new options
- * opts = opts ++ SomeOptions
- * note("\nAlgorithm control options:")
- * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
- * options + ("maxThings" -> x) ...
- * }
- * parser.parse(args, parser.opts) map { opts =>
- * parser.opts = opts
- * process
- * }
- * }
- *
- * override def process: Unit = {
- * start()
- * // do the work here
- * stop
- * }
- *
- * }}}
- */
-abstract class MahoutDriver {
-
-
- implicit protected var mc: DistributedContext = _
- implicit protected var sparkConf = new SparkConf()
- protected var parser: MahoutOptionParser = _
-
- var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite
-
- /** Creates a Spark context to run the job inside.
- * Override to set the SparkConf values specific to the job,
- * these must be set before the context is created.
- * @param masterUrl Spark master URL
- * @param appName Name to display in Spark UI
- * */
- protected def start(masterUrl: String, appName: String) : Unit = {
- if (!_useExistingContext) {
- mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
- }
- }
-
- /** Override (optionally) for special cleanup */
- protected def stop: Unit = {
- if (!_useExistingContext) mc.close
- }
-
- /** This is where you do the work, call start first, then before exiting call stop */
- protected def process: Unit
-
- /** Parse command line and call process */
- def main(args: Array[String]): Unit
-
- def useContext(context: DistributedContext): Unit = {
- _useExistingContext = true
- mc = context
- sparkConf = mc.getConf
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
deleted file mode 100644
index ad7a76b..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.drivers
-
-import scopt.OptionParser
-import scala.collection.immutable
-
-/** Companion object defines default option groups for reference in any driver that needs them */
-object MahoutOptionParser {
- // set up the various default option groups
- final val GenericOptions = immutable.HashMap[String, Any](
- "randomSeed" -> System.currentTimeMillis().toInt,
- "writeAllDatasets" -> false)
-
- final val SparkOptions = immutable.HashMap[String, Any](
- "master" -> "local",
- "sparkExecutorMem" -> "2g",
- "appName" -> "Generic Spark App, Change this.")
-
- final val FileIOOptions = immutable.HashMap[String, Any](
- "input" -> null.asInstanceOf[String],
- "input2" -> null.asInstanceOf[String],
- "output" -> null.asInstanceOf[String])
-
- final val FileDiscoveryOptions = immutable.HashMap[String, Any](
- "recursive" -> false,
- "filenamePattern" -> "^part-.*")
-
- final val TextDelimitedElementsOptions = immutable.HashMap[String, Any](
- "rowIDColumn" -> 0,
- "itemIDColumn" -> 1,
- "filterColumn" -> -1,
- "filter1" -> null.asInstanceOf[String],
- "filter2" -> null.asInstanceOf[String],
- "inDelim" -> "[,\t ]")
-
- final val TextDelimitedDRMOptions = immutable.HashMap[String, Any](
- "rowKeyDelim" -> "\t",
- "columnIdStrengthDelim" -> ":",
- "elementDelim" -> " ",
- "omitStrength" -> false)
-}
-/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
- * keep both standarized.
- * @param programName Name displayed in help message, the name by which the driver is invoked.
- * */
-class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
-
- // build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so the can override and previous options!
- var opts = Map.empty[String, Any]
-
- override def showUsageOnError = true
-
- def parseIOOptions(numInputs: Int = 1) = {
- opts = opts ++ MahoutOptionParser.FileIOOptions
- note("Input, output options")
- opt[String]('i', "input") required() action { (x, options) =>
- options + ("input" -> x)
- } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)")
-
- if (numInputs == 2) {
- opt[String]("input2") abbr ("i2") action { (x, options) =>
- options + ("input2" -> x)
- } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.")
- }
-
- opt[String]('o', "output") required() action { (x, options) =>
- // todo: check to see if HDFS allows MS-Windows backslashes locally?
- if (x.endsWith("/")) {
- options + ("output" -> x)
- } else {
- options + ("output" -> (x + "/"))
- }
- } text ("Path for output, any local or HDFS supported URI (required)")
-
- }
-
- def parseSparkOptions = {
- opts = opts ++ MahoutOptionParser.SparkOptions
- note("\nSpark config options:")
-
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options + ("master" -> x)
- }
-
- opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) =>
- options + ("sparkExecutorMem" -> x)
- }
-
- }
-
- def parseGenericOptions = {
- opts = opts ++ MahoutOptionParser.GenericOptions
- opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
- options + ("randomSeed" -> x)
- } validate { x =>
- if (x > 0) success else failure("Option --randomSeed must be > 0")
- }
-
- //output both input DRMs
- opt[Unit]("writeAllDatasets") hidden() action { (_, options) =>
- options + ("writeAllDatasets" -> true)
- }//Hidden option, though a user might want this.
- }
-
- def parseElementInputSchemaOptions{
- //Input text file schema--not driver specific but input data specific, elements input,
- // not drms
- opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
- note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
- options + ("inDelim" -> x)
- }
-
- opt[String]("filter1") abbr ("f1") action { (x, options) =>
- options + ("filter1" -> x)
- } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used")
-
- opt[String]("filter2") abbr ("f2") action { (x, options) =>
- options + ("filter2" -> x)
- } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected")
-
- opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) =>
- options + ("rowIDColumn" -> x)
- } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x =>
- if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
- }
-
- opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) =>
- options + ("itemIDColumn" -> x)
- } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x =>
- if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
- }
-
- opt[Int]("filterColumn") abbr ("fc") action { (x, options) =>
- options + ("filterColumn" -> x)
- } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x =>
- if (x >= -1) success else failure("Option --filterColNum must be >= -1")
- }
-
- note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used")
-
- checkConfig { options: Map[String, Any] =>
- if (options("filterColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int]
- || options("filterColumn").asInstanceOf[Int] == options("rowIDColumn").asInstanceOf[Int]
- || options("rowIDColumn").asInstanceOf[Int] == options("itemIDColumn").asInstanceOf[Int])
- failure("The row, item, and filter positions must be unique.") else success
- }
-
- //check for option consistency, probably driver specific
- checkConfig { options: Map[String, Any] =>
- if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String]
- && options("filter2").asInstanceOf[String] != null.asInstanceOf[String]
- && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String])
- failure ("If using filters they must be unique.") else success
- }
-
- }
-
- def parseFileDiscoveryOptions = {
- //File finding strategy--not driver specific
- opts = opts ++ MahoutOptionParser.FileDiscoveryOptions
- note("\nFile discovery options:")
- opt[Unit]('r', "recursive") action { (_, options) =>
- options + ("recursive" -> true)
- } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false")
-
- opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
- options + ("filenamePattern" -> x)
- } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory")
-
- }
-
- def parseDrmFormatOptions = {
- opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions
- note("\nOutput text file schema options:")
- opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
- options + ("rowKeyDelim" -> x)
- } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"")
-
- opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) =>
- options + ("columnIdStrengthDelim" -> x)
- } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"")
-
- opt[String]("elementDelim") abbr ("td") action { (x, options) =>
- options + ("elementDelim" -> x)
- } text ("Separates vector element values in the values list (optional). Default: \" \"")
-
- opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
- options + ("omitStrength" -> true)
- } text ("Do not write the strength to the output files (optional), Default: false.")
- note("This option is used to output indexable data for creating a search engine recommender.")
-
- note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"")
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
new file mode 100644
index 0000000..5a4254b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.spark.SparkConf
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main.
+ * Also define a Map of options for the command line parser. The following template may help:
+ * {{{
+ * object SomeDriver extends MahoutDriver {
+ *
+ * // define only the options specific to this driver, inherit the generic ones
+ * private final val SomeOptions = HashMap[String, Any](
+ * "maxThings" -> 500,
+ * "minThings" -> 100,
+ * "appName" -> "SomeDriver")
+ *
+ * override def main(args: Array[String]): Unit = {
+ *
+ *
+ * val parser = new MahoutOptionParser(programName = "shortname") {
+ * head("somedriver", "Mahout 1.0-SNAPSHOT")
+ *
+ * // Input output options, non-driver specific
+ * parseIOOptions
+ *
+ * // Algorithm specific options
+ * // Add in the new options
+ * opts = opts ++ SomeOptions
+ * note("\nAlgorithm control options:")
+ * opt[Int]("maxThings") abbr ("mt") action { (x, options) =>
+ * options + ("maxThings" -> x) ...
+ * }
+ * parser.parse(args, parser.opts) map { opts =>
+ * parser.opts = opts
+ * process
+ * }
+ * }
+ *
+ * override def process: Unit = {
+ * start()
+ * // do the work here
+ * stop
+ * }
+ *
+ * }}}
+ */
+abstract class MahoutSparkDriver extends MahoutDriver {
+
+
+ implicit protected var sparkConf = new SparkConf()
+
+ /** Creates a Spark context to run the job inside.
+ * Override to set the SparkConf values specific to the job,
+ * these must be set before the context is created.
+ * @param masterUrl Spark master URL
+ * @param appName Name to display in Spark UI
+ * */
+ protected def start(masterUrl: String, appName: String) : Unit = {
+ if (!_useExistingContext) {
+ mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
+ }
+ }
+
+ def useContext(context: DistributedContext): Unit = {
+ _useExistingContext = true
+ mc = context
+ sparkConf = mc.getConf
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
deleted file mode 100644
index 6351e45..0000000
--- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.drivers
-
-import com.google.common.collect.{HashBiMap, BiMap}
-import org.apache.mahout.math.drm.DistributedContext
-
-/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read.
- * @tparam T type of object read.
- */
-trait Reader[T]{
-
- val mc: DistributedContext
- val readSchema: Schema
-
- protected def elementReader(
- mc: DistributedContext,
- readSchema: Schema,
- source: String,
- existingRowIDs: BiMap[String, Int]): T
-
- protected def drmReader(
- mc: DistributedContext,
- readSchema: Schema,
- source: String,
- existingRowIDs: BiMap[String, Int]): T
-
- def readElementsFrom(
- source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- elementReader(mc, readSchema, source, existingRowIDs)
-
- def readDRMFrom(
- source: String,
- existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T =
- drmReader(mc, readSchema, source, existingRowIDs)
-}
-
-/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written.
- * @tparam T type of object to write.
- */
-trait Writer[T]{
-
- val mc: DistributedContext
- val sort: Boolean
- val writeSchema: Schema
-
- protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit
-
- def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort)
-}