You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2014/12/12 17:11:56 UTC

mahout git commit: refactor more engine specific parts of CLI and drivers and adding -D:key=value for changing the sparkconf of a driver

Repository: mahout
Updated Branches:
  refs/heads/master 6530e8144 -> ae1808be0


refactor more engine specific parts of CLI and drivers and adding -D:key=value for changing the sparkconf of a driver


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ae1808be
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ae1808be
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ae1808be

Branch: refs/heads/master
Commit: ae1808be0e76c15544e75ec54aae3fc720f65364
Parents: 6530e81
Author: pferrel <pa...@occamsmachete.com>
Authored: Thu Dec 11 16:26:57 2014 -0800
Committer: pferrel <pa...@occamsmachete.com>
Committed: Thu Dec 11 16:26:57 2014 -0800

----------------------------------------------------------------------
 .../mahout/drivers/MahoutOptionParser.scala     | 23 ++---------
 .../mahout/drivers/ItemSimilarityDriver.scala   | 15 +++----
 .../mahout/drivers/MahoutSparkDriver.scala      |  3 ++
 .../drivers/MahoutSparkOptionParser.scala       | 43 ++++++++++++++++++++
 .../mahout/drivers/RowSimilarityDriver.scala    | 20 +++------
 .../drivers/TextDelimitedReaderWriter.scala     | 26 ++++++------
 .../drivers/ItemSimilarityDriverSuite.scala     |  1 -
 7 files changed, 74 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 046bc87..479a8d0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -23,12 +23,13 @@ import scala.collection.immutable
 /** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to
   * keep both standarized.
   * @param programName Name displayed in help message, the name by which the driver is invoked.
-  * @note not all options are platform neutral so other platforms can add option parsing here if desired.
+  * @note options are engine neutral by convention. See the engine specific extending class for
+  *       to add Spark or other engine options.
   * */
 class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) {
 
   // build options from some stardard CLI param groups
-  // Note: always put the driver specific options at the last so the can override and previous options!
+  // Note: always put the driver specific options at the last so they can override and previous options!
   var opts = Map.empty[String, Any]
 
   override def showUsageOnError = true
@@ -47,7 +48,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     }
 
     opt[String]('o', "output") required() action { (x, options) =>
-      // todo: check to see if HDFS allows MS-Windows backslashes locally?
       if (x.endsWith("/")) {
         options + ("output" -> x)
       } else {
@@ -57,21 +57,6 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
 
   }
 
-  def parseSparkOptions = {
-    opts = opts ++ MahoutOptionParser.SparkOptions
-    opts = opts + ("appName" -> programName)
-    note("\nSpark config options:")
-
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
-      options + ("master" -> x)
-    }
-
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
-      options + ("sparkExecutorMem" -> x)
-    }
-
-  }
-
   def parseGenericOptions = {
     opts = opts ++ MahoutOptionParser.GenericOptions
     opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
@@ -91,7 +76,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A
     // not drms
     opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions
     note("\nInput text file schema options:")
-    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) =>
+    opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) =>
       options + ("inDelim" -> x)
     }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index 42e9d81..01a18c9 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -24,12 +24,12 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]].
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]].
  * Reads text lines
  * that contain (row id, column id, ...). The IDs are user specified strings which will be
  * preserved in the
  * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
- * and [[org.apache.mahout.cf.SimilarityAnalysis#cooccurrencesIDSs]]
+ * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]
  * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two
  * matrices and calculate both the self similarity of the primary matrix and the row-wise
  * similarity of the primary
@@ -42,7 +42,8 @@ import scala.collection.immutable.HashMap
  * you can specify only the input and output file and directory--all else will default to the correct values.
  * Each output line will contain the Item ID and similar items sorted by LLR strength descending.
  * @note To use with a Spark cluster see the --master option, if you run out of heap space check
- *       the --sparkExecutorMemory option.
+ *       the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v
+ *       option.
  */
 object ItemSimilarityDriver extends MahoutSparkDriver {
   // define only the options specific to ItemSimilarity
@@ -60,7 +61,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
    */
   override def main(args: Array[String]): Unit = {
 
-    parser = new MahoutOptionParser(programName = "spark-itemsimilarity") {
+    parser = new MahoutSparkOptionParser(programName = "spark-itemsimilarity") {
       head("spark-itemsimilarity", "Mahout 1.0")
 
       //Input output options, non-driver specific
@@ -120,12 +121,6 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
       appName: String = parser.opts("appName").asInstanceOf[String]):
     Unit = {
 
-    // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
-    // MahoutKryoRegistrator, it should be added to the register list here so it
-    // will be only specific to this job.
-    sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")// todo: should this be left to config or an option?
-
     if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
       sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
     //else leave as set in Spark config

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index 5a4254b..e6299fd 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -74,6 +74,9 @@ abstract class MahoutSparkDriver extends MahoutDriver {
     * @param appName  Name to display in Spark UI
     * */
   protected def start(masterUrl: String, appName: String) : Unit = {
+    sparkConf.set("spark.kryo.referenceTracking", "false")
+      .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option
+
     if (!_useExistingContext) {
       mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf)
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
new file mode 100644
index 0000000..a46d2ee
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import org.apache.spark.SparkConf
+
+class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){
+
+  def parseSparkOptions(implicit sparkConf: SparkConf) = {
+    opts = opts ++ MahoutOptionParser.SparkOptions
+    opts = opts + ("appName" -> programName)
+    note("\nSpark config options:")
+
+    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) =>
+      options + ("master" -> x)
+    }
+
+    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) =>
+      options + ("sparkExecutorMem" -> x)
+    }
+
+    opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) =>
+      sparkConf.set(k, v)
+    } validate { x =>
+      if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank")
+    } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)")
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index f8abef3..9b44b95 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -24,11 +24,11 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
- * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]].
+ * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]].
  * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]]
  * with domain specific IDS of the form
  * (row id, column id: strength, ...). The IDs will be preserved in the
- * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]
+ * output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]
  * will be used to calculate row-wise similarity using log-likelihood
  * The options allow control of the input schema, file discovery, output schema, and control of
  * algorithm parameters.
@@ -53,7 +53,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
    */
   override def main(args: Array[String]): Unit = {
 
-    parser = new MahoutOptionParser(programName = "spark-rowsimilarity") {
+    parser = new MahoutSparkOptionParser(programName = "spark-rowsimilarity") {
       head("spark-rowsimilarity", "Mahout 1.0")
 
       //Input output options, non-driver specific
@@ -107,18 +107,8 @@ object RowSimilarityDriver extends MahoutSparkDriver {
   }
 
   override def start(masterUrl: String = parser.opts("master").asInstanceOf[String],
-                     appName: String = parser.opts("appName").asInstanceOf[String]):
-  Unit = {
-
-    // todo: the HashBiMap used in the TextDelimited Reader is hard coded into
-    // MahoutKryoRegistrator, it should be added to the register list here so it
-    // will be only specific to this job.
-    sparkConf.set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take this out?
-
-    if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")
-      sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String])
-    //else leave as set in Spark config
+      appName: String = parser.opts("appName").asInstanceOf[String]):
+    Unit = {
 
     super.start(masterUrl, appName)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index ba8f7d1..895bd01 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -76,9 +76,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
       val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect()
       val columnIDs = interactions.map { case (_, columnID) => columnID }.distinct().collect()
 
-      val numRows = rowIDs.size
-      val numColumns = columnIDs.size
-
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
       // broadcast them for access in distributed processes, so they are not recalculated in every task.
       val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
@@ -87,6 +84,9 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
       val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
+      val ncol = columnIDDictionary.size()
+      val nrow = rowIDDictionary.size()
+
       val indexedInteractions =
         interactions.map { case (rowID, columnID) =>
           val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
@@ -96,7 +96,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
         }
         // group by IDs to form row vectors
         .groupByKey().map { case (rowIndex, columnIndexes) =>
-          val row = new RandomAccessSparseVector(numColumns)
+          val row = new RandomAccessSparseVector(ncol)
           for (columnIndex <- columnIndexes) {
             row.setQuick(columnIndex, 1.0)
           }
@@ -105,7 +105,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
         .asInstanceOf[DrmRdd[Int]]
 
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
-      val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+      val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
 
       new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
 
@@ -162,9 +162,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
         colIDs
       }.distinct().collect()
 
-      val numRows = rowIDs.size
-      val numColumns = columnIDs.size
-
       // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
       // broadcast them for access in distributed processes, so they are not recalculated in every task.
       val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs)
@@ -173,12 +170,15 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
       val columnIDDictionary = asOrderedDictionary(entries = columnIDs)
       val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
 
+      val ncol = columnIDDictionary.size()
+      val nrow = rowIDDictionary.size()
+
       val indexedInteractions =
         interactions.map { case (rowID, columns) =>
           val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
 
           val elements = columns.split(elementDelim)
-          val row = new RandomAccessSparseVector(numColumns)
+          val row = new RandomAccessSparseVector(ncol)
           for (element <- elements) {
             val id = element.split(columnIdStrengthDelim)(0)
             val columnID = columnIDDictionary_bcast.value.get(id).get
@@ -193,7 +193,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
         .asInstanceOf[DrmRdd[Int]]
 
       // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed
-      val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns)
+      val drmInteractions = drmWrap[Int](indexedInteractions, nrow, ncol)
 
       new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary)
 
@@ -213,8 +213,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
   private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
     var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index'
     for (entry <- entries) {
-      if (!dictionary.contains(entry)) dictionary.put(entry, index)
-      index += 1
+      if (!dictionary.contains(entry)){
+        dictionary.put(entry, index)
+        index += 1
+      }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason they do
     }
     dictionary
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/ae1808be/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 7919e88..76c3553 100644
--- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -661,7 +661,6 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite {
 
     assert(biggerIDSA.matrix.nrow == 5)
 
-    val bp = 0
   }
 
   test("ItemSimilarityDriver cross similarity two separate items spaces, missing rows in B") {