You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/12/12 17:11:56 UTC
mahout git commit: refactor more engine specific parts of CLI and
drivers and adding -D:key=value for changing the sparkconf of a driver
Repository: mahout
Updated Branches:
refs/heads/master 6530e8144 -> ae1808be0
refactor more engine specific parts of CLI and drivers and adding -D:key=value for changing the sparkconf of a driver
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ae1808be
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ae1808be
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ae1808be
Branch: refs/heads/master
Commit: ae1808be0e76c15544e75ec54aae3fc720f65364
Parents: 6530e81
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Dec 11 16:26:57 2014 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Dec 11 16:26:57 2014 -0800
----------------------------------------------------------------------
.../mahout/drivers/MahoutOptionParser.scala | 23 ++---------
.../mahout/drivers/ItemSimilarityDriver.scala | 15 +++----
.../mahout/drivers/MahoutSparkDriver.scala | 3 ++
.../drivers/MahoutSparkOptionParser.scala | 43 ++++++++++++++++++++
.../mahout/drivers/RowSimilarityDriver.scala | 20 +++------
.../drivers/TextDelimitedReaderWriter.scala | 26 ++++++------
.../drivers/ItemSimilarityDriverSuite.scala | 1 -
7 files changed, 74 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 046bc87..479a8d0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -23,12 +23,13 @@ import scala.collection.immutable
/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
* keep both standarized.
* @param programName Name displayed in help message, the name by which the driver is invoked.
- * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+ * @note options are engine neutral by convention. See the engine specific extending class for
+ * to add Spark or other engine options.
* */
class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
// build options from some stardard CLI param groups
- // Note: always put the driver specific options at the last so the can override and previous options!
+ // Note: always put the driver specific options at the last so they can override and previous options!
var opts = Map.empty[String, Any]
override def showUsageOnError = true
@@ -47,7 +48,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
opt[String]('o', "output") required() action { (x, options) =>
- // todo: check to see if HDFS allows MS-Windows backslashes locally?
if (x.endsWith("/")) {
options + ("output" -> x)
} else {
@@ -57,21 +57,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
}
- def parseSparkOptions = {
- opts = opts ++ MahoutOptionParser.SparkOptions
- opts = opts + ("appName" -> programName)
- note("\nSpark config options:")
-
- opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
- options + ("master" -> x)
- }
-
- opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
- options + ("sparkExecutorMem" -> x)
- }
-
- }
-
def parseGenericOptions = {
opts = opts ++ MahoutOptionParser.GenericOptions
opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
@@ -91,7 +76,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
// not drms
opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
note("\nInput text file schema options:")
- opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+ opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
options + ("inDelim" -> x)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 42e9d81..01a18c9 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,12 +24,12 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
* Reads text lines
* that contain (row id, column id, ...). The IDs are user specified strings which will be
* preserved in the
* output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
+ * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
* will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
* matrices and calculate both the self similarity of the primary matrix and the row-wise
* similarity of the primary
@@ -42,7 +42,8 @@ import scala.collection.immutable.HashMap
* you can specify only the input and output file and directory--all else will default to the correct values.
* Each output line will contain the Item ID and similar items sorted by LLR strength descending.
* @note To use with a Spark cluster see the --master option, if you run out of heap space check
- * the --sparkExecutorMemory option.
+ * the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
+ * option.
*/
object ItemSimilarityDriver extends MahoutSparkDriver {
// define only the options specific to ItemSimilarity
@@ -60,7 +61,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
*/
override def main(args: Array[String]): Unit = {
- parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+ parser = new MahoutSparkOptionParser(programName = "spark-itemsimilarity") {
head("spark-itemsimilarity", "Mahout 1.0")
//Input output options, non-driver specific
@@ -120,12 +121,6 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
appName: String = parser.opts("appName").asInstanceOf[String]):
Unit = {
- // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
- // MahoutKryoRegistrator, it should be added to the register list here so it
- // will be only specific to this job.
- sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
-
if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
//else leave as set in Spark config
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index 5a4254b..e6299fd 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -74,6 +74,9 @@ abstract class MahoutSparkDriver extends MahoutDriver {
* @param appName Name to display in Spark UI
* */
protected def start(masterUrl: String, appName: String) : Unit = {
+ sparkConf.set("spark.kryo.referenceTracking", "false")
+ .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
+
if (!_useExistingContext) {
mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
new file mode 100644
index 0000000..a46d2ee
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import org.apache.spark.SparkConf
+
+class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
+
+ def parseSparkOptions(implicit sparkConf: SparkConf) = {
+ opts = opts ++ MahoutOptionParser.SparkOptions
+ opts = opts + ("appName" -> programName)
+ note("\nSpark config options:")
+
+ opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+ options + ("master" -> x)
+ }
+
+ opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+ options + ("sparkExecutorMem" -> x)
+ }
+
+ opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
+ sparkConf.set(k, v)
+ } validate { x =>
+ if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
+ } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index f8abef3..9b44b95 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -24,11 +24,11 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scala.collection.immutable.HashMap
/**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]].
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
* Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
* with domain specific IDS of the form
* (row id, column id: strength, ...). The IDs will be preserved in the
- * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]
+ * output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
* will be used to calculate row-wise similarity using log-likelihood
* The options allow control of the input schema, file discovery, output schema, and control of
* algorithm parameters.
@@ -53,7 +53,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
*/
override def main(args: Array[String]): Unit = {
- parser = new MahoutOptionParser(programName = "spark-rowsimilarity") {
+ parser = new MahoutSparkOptionParser(programName = "spark-rowsimilarity") {
head("spark-rowsimilarity", "Mahout 1.0")
//Input output options, non-driver specific
@@ -107,18 +107,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
}
override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
- appName: String = parser.opts("appName").asInstanceOf[String]):
- Unit = {
-
- // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
- // MahoutKryoRegistrator, it should be added to the register list here so it
- // will be only specific to this job.
- sparkConf.set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take this out?
-
- if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
- sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
- //else leave as set in Spark config
+ appName: String = parser.opts("appName").asInstanceOf[String]):
+ Unit = {
super.start(masterUrl, appName)
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index ba8f7d1..895bd01 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -76,9 +76,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
val columnIDs = interactions.map { case (_, columnID) => columnID }.distinct().collect()
- val numRows = rowIDs.size
- val numColumns = columnIDs.size
-
// create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
// broadcast them for access in distributed processes, so they are not recalculated in every task.
val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
@@ -87,6 +84,9 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+ val ncol = columnIDDictionary.size()
+ val nrow = rowIDDictionary.size()
+
val indexedInteractions =
interactions.map { case (rowID, columnID) =>
val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
@@ -96,7 +96,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
}
// group by IDs to form row vectors
.groupByKey().map { case (rowIndex, columnIndexes) =>
- val row = new RandomAccessSparseVector(numColumns)
+ val row = new RandomAccessSparseVector(ncol)
for (columnIndex <- columnIndexes) {
row.setQuick(columnIndex, 1.0)
}
@@ -105,7 +105,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
.asInstanceOf[DrmRdd[Int]]
// wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
- val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+ val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
@@ -162,9 +162,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
colIDs
}.distinct().collect()
- val numRows = rowIDs.size
- val numColumns = columnIDs.size
-
// create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
// broadcast them for access in distributed processes, so they are not recalculated in every task.
val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
@@ -173,12 +170,15 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+ val ncol = columnIDDictionary.size()
+ val nrow = rowIDDictionary.size()
+
val indexedInteractions =
interactions.map { case (rowID, columns) =>
val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
val elements = columns.split(elementDelim)
- val row = new RandomAccessSparseVector(numColumns)
+ val row = new RandomAccessSparseVector(ncol)
for (element <- elements) {
val id = element.split(columnIdStrengthDelim)(0)
val columnID = columnIDDictionary_bcast.value.get(id).get
@@ -193,7 +193,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
.asInstanceOf[DrmRdd[Int]]
// wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
- val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+ val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
@@ -213,8 +213,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
for (entry <- entries) {
- if (!dictionary.contains(entry)) dictionary.put(entry, index)
- index += 1
+ if (!dictionary.contains(entry)){
+ dictionary.put(entry, index)
+ index += 1
+ }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason they do
}
dictionary
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 7919e88..76c3553 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -661,7 +661,6 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
assert(biggerIDSA.matrix.nrow == 5)
- val bp = 0
}
test("ItemSimilarityDriver cross similarity two separate items spaces, missing rows in B") {