You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/17 07:59:14 UTC

svn commit: r1493648 - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/clustering/lda/cvb/ core/src/main/java/org/apache/mahout/common/ core/src/test/java/org/apache/mahout/clustering/lda/cvb/

Author: ssc
Date: Mon Jun 17 05:59:13 2013
New Revision: 1493648

URL: http://svn.apache.org/r1493648
Log:
MAHOUT-1262 Cleanup LDA code

Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Jun 17 05:59:13 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.8 - unreleased
 
+  MAHOUT-1262: Cleanup LDA code (ssc)
+
   MAHOUT-1255: Fix for weights in Multinomial sometimes overflowing in BallKMeans (dfilimon)
 
   MAHOUT-1254: Final round of cleanup for StreamingKMeans (dfilimon)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java Mon Jun 17 05:59:13 2013
@@ -27,18 +27,21 @@ import java.io.IOException;
 
 public class CVB0DocInferenceMapper extends CachingCVB0Mapper {
 
+  private final VectorWritable topics = new VectorWritable();
+
   @Override
   public void map(IntWritable docId, VectorWritable doc, Context context)
     throws IOException, InterruptedException {
     int numTopics = getNumTopics();
-    Vector docTopics = new DenseVector(new double[numTopics]).assign(1.0 / numTopics);
+    Vector docTopics = new DenseVector(numTopics).assign(1.0 / numTopics);
     Matrix docModel = new SparseRowMatrix(numTopics, doc.get().size());
     int maxIters = getMaxIters();
     ModelTrainer modelTrainer = getModelTrainer();
     for (int i = 0; i < maxIters; i++) {
       modelTrainer.getReadModel().trainDocTopicModel(doc.get(), docTopics, docModel);
     }
-    context.write(docId, new VectorWritable(docTopics));
+    topics.set(docTopics);
+    context.write(docId, topics);
   }
 
   @Override

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java Mon Jun 17 05:59:13 2013
@@ -30,9 +30,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
@@ -121,36 +119,49 @@ public class CVB0Driver extends Abstract
   public static final String BACKFILL_PERPLEXITY = "backfill_perplexity";
   private static final String MODEL_PATHS = "mahout.lda.cvb.modelPath";
 
+  private static final double DEFAULT_CONVERGENCE_DELTA = 0;
+  private static final double DEFAULT_DOC_TOPIC_SMOOTHING = 0.0001;
+  private static final double DEFAULT_TERM_TOPIC_SMOOTHING = 0.0001;
+  private static final int DEFAULT_ITERATION_BLOCK_SIZE = 10;
+  private static final double DEFAULT_TEST_SET_FRACTION = 0;
+  private static final int DEFAULT_NUM_TRAIN_THREADS = 4;
+  private static final int DEFAULT_NUM_UPDATE_THREADS = 1;
+  private static final int DEFAULT_MAX_ITERATIONS_PER_DOC = 10;
+  private static final int DEFAULT_NUM_REDUCE_TASKS = 10;
+
   @Override
   public int run(String[] args) throws Exception {
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.maxIterationsOption().create());
-    addOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION, "cd", "The convergence delta value", "0");
+    addOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION, "cd", "The convergence delta value",
+              String.valueOf(DEFAULT_CONVERGENCE_DELTA));
     addOption(DefaultOptionCreator.overwriteOption().create());
 
     addOption(NUM_TOPICS, "k", "Number of topics to learn", true);
     addOption(NUM_TERMS, "nt", "Vocabulary size", false);
-    addOption(DOC_TOPIC_SMOOTHING, "a", "Smoothing for document/topic distribution", "0.0001");
-    addOption(TERM_TOPIC_SMOOTHING, "e", "Smoothing for topic/term distribution", "0.0001");
-    addOption(DICTIONARY, "dict", "Path to term-dictionary file(s) (glob expression supported)",
-        false);
-    addOption(DOC_TOPIC_OUTPUT, "dt", "Output path for the training doc/topic distribution",
-        false);
-    addOption(MODEL_TEMP_DIR, "mt", "Path to intermediate model path (useful for restarting)",
-        false);
-    addOption(ITERATION_BLOCK_SIZE, "block", "Number of iterations per perplexity check", "10");
+    addOption(DOC_TOPIC_SMOOTHING, "a", "Smoothing for document/topic distribution",
+              String.valueOf(DEFAULT_DOC_TOPIC_SMOOTHING));
+    addOption(TERM_TOPIC_SMOOTHING, "e", "Smoothing for topic/term distribution",
+              String.valueOf(DEFAULT_TERM_TOPIC_SMOOTHING));
+    addOption(DICTIONARY, "dict", "Path to term-dictionary file(s) (glob expression supported)", false);
+    addOption(DOC_TOPIC_OUTPUT, "dt", "Output path for the training doc/topic distribution", false);
+    addOption(MODEL_TEMP_DIR, "mt", "Path to intermediate model path (useful for restarting)", false);
+    addOption(ITERATION_BLOCK_SIZE, "block", "Number of iterations per perplexity check",
+              String.valueOf(DEFAULT_ITERATION_BLOCK_SIZE));
     addOption(RANDOM_SEED, "seed", "Random seed", false);
-    addOption(TEST_SET_FRACTION, "tf", "Fraction of data to hold out for testing", "0");
-    addOption(NUM_TRAIN_THREADS, "ntt", "number of threads per mapper to train with", "4");
+    addOption(TEST_SET_FRACTION, "tf", "Fraction of data to hold out for testing",
+              String.valueOf(DEFAULT_TEST_SET_FRACTION));
+    addOption(NUM_TRAIN_THREADS, "ntt", "number of threads per mapper to train with",
+              String.valueOf(DEFAULT_NUM_TRAIN_THREADS));
     addOption(NUM_UPDATE_THREADS, "nut", "number of threads per mapper to update the model with",
-        "1");
-    addOption(MAX_ITERATIONS_PER_DOC, "mipd",
-        "max number of iterations per doc for p(topic|doc) learning", "10");
-    addOption(NUM_REDUCE_TASKS, null,
-        "number of reducers to use during model estimation", "10");
-    addOption(buildOption(BACKFILL_PERPLEXITY, null,
-        "enable backfilling of missing perplexity values", false, false, null));
+              String.valueOf(DEFAULT_NUM_UPDATE_THREADS));
+    addOption(MAX_ITERATIONS_PER_DOC, "mipd", "max number of iterations per doc for p(topic|doc) learning",
+              String.valueOf(DEFAULT_MAX_ITERATIONS_PER_DOC));
+    addOption(NUM_REDUCE_TASKS, null, "number of reducers to use during model estimation",
+              String.valueOf(DEFAULT_NUM_REDUCE_TASKS));
+    addOption(buildOption(BACKFILL_PERPLEXITY, null, "enable backfilling of missing perplexity values", false, false,
+              null));
 
     if (parseArguments(args) == null) {
       return -1;
@@ -204,27 +215,30 @@ public class CVB0Driver extends Abstract
     return maxTermId + 1;
   }
 
-  public static int run(Configuration conf,
-                        Path inputPath,
-                        Path topicModelOutputPath,
-                        int numTopics,
-                        int numTerms,
-                        double alpha,
-                        double eta,
-                        int maxIterations,
-                        int iterationBlockSize,
-                        double convergenceDelta,
-                        Path dictionaryPath,
-                        Path docTopicOutputPath,
-                        Path topicModelStateTempPath,
-                        long randomSeed,
-                        float testFraction,
-                        int numTrainThreads,
-                        int numUpdateThreads,
-                        int maxItersPerDoc,
-                        int numReduceTasks,
-                        boolean backfillPerplexity)
+  public int run(Configuration conf,
+                 Path inputPath,
+                 Path topicModelOutputPath,
+                 int numTopics,
+                 int numTerms,
+                 double alpha,
+                 double eta,
+                 int maxIterations,
+                 int iterationBlockSize,
+                 double convergenceDelta,
+                 Path dictionaryPath,
+                 Path docTopicOutputPath,
+                 Path topicModelStateTempPath,
+                 long randomSeed,
+                 float testFraction,
+                 int numTrainThreads,
+                 int numUpdateThreads,
+                 int maxItersPerDoc,
+                 int numReduceTasks,
+                 boolean backfillPerplexity)
     throws ClassNotFoundException, IOException, InterruptedException {
+
+    setConf(conf);
+
     // verify arguments
     Preconditions.checkArgument(testFraction >= 0.0 && testFraction <= 1.0,
         "Expected 'testFraction' value in range [0, 1] but found value '%s'", testFraction);
@@ -342,23 +356,18 @@ public class CVB0Driver extends Abstract
     return Math.abs(perplexities.get(sz - 1) - perplexities.get(sz - 2)) / perplexities.get(0);
   }
 
-  private static double calculatePerplexity(Configuration conf, Path corpusPath, Path modelPath, int iteration)
+  private double calculatePerplexity(Configuration conf, Path corpusPath, Path modelPath, int iteration)
     throws IOException, ClassNotFoundException, InterruptedException {
     String jobName = "Calculating perplexity for " + modelPath;
     log.info("About to run: {}", jobName);
-    Job job = new Job(conf, jobName);
-    job.setJarByClass(CachingCVB0PerplexityMapper.class);
-    job.setMapperClass(CachingCVB0PerplexityMapper.class);
+
+    Path outputPath = perplexityPath(modelPath.getParent(), iteration);
+    Job job = prepareJob(corpusPath, outputPath, CachingCVB0PerplexityMapper.class, DoubleWritable.class,
+        DoubleWritable.class, DualDoubleSumReducer.class, DoubleWritable.class, DoubleWritable.class);
+
+    job.setJobName(jobName);
     job.setCombinerClass(DualDoubleSumReducer.class);
-    job.setReducerClass(DualDoubleSumReducer.class);
     job.setNumReduceTasks(1);
-    job.setOutputKeyClass(DoubleWritable.class);
-    job.setOutputValueClass(DoubleWritable.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    FileInputFormat.addInputPath(job, corpusPath);
-    Path outputPath = perplexityPath(modelPath.getParent(), iteration);
-    FileOutputFormat.setOutputPath(job, outputPath);
     setModelPaths(job, modelPath);
     HadoopUtil.delete(conf, outputPath);
     if (!job.waitForCompletion(true)) {
@@ -422,35 +431,25 @@ public class CVB0Driver extends Abstract
     return perplexity / modelWeight;
   }
 
-  private static Job writeTopicModel(Configuration conf, Path modelInput, Path output)
+  private Job writeTopicModel(Configuration conf, Path modelInput, Path output)
     throws IOException, InterruptedException, ClassNotFoundException {
     String jobName = String.format("Writing final topic/term distributions from %s to %s", modelInput, output);
     log.info("About to run: {}", jobName);
-    Job job = new Job(conf, jobName);
-    job.setJarByClass(CVB0Driver.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setMapperClass(CVB0TopicTermVectorNormalizerMapper.class);
-    job.setNumReduceTasks(0);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(VectorWritable.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    FileInputFormat.addInputPath(job, modelInput);
-    FileOutputFormat.setOutputPath(job, output);
+
+    Job job = prepareJob(modelInput, output, SequenceFileInputFormat.class, CVB0TopicTermVectorNormalizerMapper.class,
+        IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, jobName);
     job.submit();
     return job;
   }
 
-  private static Job writeDocTopicInference(Configuration conf, Path corpus, Path modelInput, Path output)
+  private Job writeDocTopicInference(Configuration conf, Path corpus, Path modelInput, Path output)
     throws IOException, ClassNotFoundException, InterruptedException {
     String jobName = String.format("Writing final document/topic inference from %s to %s", corpus, output);
     log.info("About to run: {}", jobName);
-    Job job = new Job(conf, jobName);
-    job.setMapperClass(CVB0DocInferenceMapper.class);
-    job.setNumReduceTasks(0);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(VectorWritable.class);
+
+    Job job = prepareJob(corpus, outputPath, SequenceFileInputFormat.class, CVB0DocInferenceMapper.class,
+        IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, jobName);
+
     FileSystem fs = FileSystem.get(corpus.toUri(), conf);
     if (modelInput != null && fs.exists(modelInput)) {
       FileStatus[] statuses = fs.listStatus(modelInput, PathFilters.partFilter());
@@ -461,9 +460,6 @@ public class CVB0Driver extends Abstract
       DistributedCache.setCacheFiles(modelUris, conf);
       setModelPaths(job, modelInput);
     }
-    FileInputFormat.addInputPath(job, corpus);
-    FileOutputFormat.setOutputPath(job, output);
-    job.setJarByClass(CVB0Driver.class);
     job.submit();
     return job;
   }
@@ -472,10 +468,6 @@ public class CVB0Driver extends Abstract
     return new Path(topicModelStateTempPath, "model-" + iterationNumber);
   }
 
-  public static Path stage1OutputPath(Path topicModelStateTempPath, int iterationNumber) {
-    return new Path(topicModelStateTempPath, "tmp-" + iterationNumber);
-  }
-
   public static Path perplexityPath(Path topicModelStateTempPath, int iterationNumber) {
     return new Path(topicModelStateTempPath, "perplexity-" + iterationNumber);
   }
@@ -493,24 +485,17 @@ public class CVB0Driver extends Abstract
     return iterationNumber - 1;
   }
 
-  public static void runIteration(Configuration conf, Path corpusInput, Path modelInput, Path modelOutput,
-                                  int iterationNumber, int maxIterations, int numReduceTasks)
+  public void runIteration(Configuration conf, Path corpusInput, Path modelInput, Path modelOutput,
+                           int iterationNumber, int maxIterations, int numReduceTasks)
     throws IOException, ClassNotFoundException, InterruptedException {
     String jobName = String.format("Iteration %d of %d, input path: %s",
         iterationNumber, maxIterations, modelInput);
     log.info("About to run: {}", jobName);
-    Job job = new Job(conf, jobName);
-    job.setJarByClass(CVB0Driver.class);
-    job.setMapperClass(CachingCVB0Mapper.class);
+    Job job = prepareJob(corpusInput, modelOutput, CachingCVB0Mapper.class, IntWritable.class, VectorWritable.class,
+        VectorSumReducer.class, IntWritable.class, VectorWritable.class);
     job.setCombinerClass(VectorSumReducer.class);
-    job.setReducerClass(VectorSumReducer.class);
     job.setNumReduceTasks(numReduceTasks);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(VectorWritable.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    FileInputFormat.addInputPath(job, corpusInput);
-    FileOutputFormat.setOutputPath(job, modelOutput);
+    job.setJobName(jobName);
     setModelPaths(job, modelInput);
     HadoopUtil.delete(conf, modelOutput);
     if (!job.waitForCompletion(true)) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java Mon Jun 17 05:59:13 2013
@@ -60,7 +60,7 @@ public class CachingCVB0Mapper
   private ModelTrainer modelTrainer;
   private int maxIters;
   private int numTopics;
-  
+
   protected ModelTrainer getModelTrainer() {
     return modelTrainer;
   }
@@ -112,7 +112,7 @@ public class CachingCVB0Mapper
   public void map(IntWritable docId, VectorWritable document, Context context)
     throws IOException, InterruptedException {
     /* where to get docTopics? */
-    Vector topicVector = new DenseVector(new double[numTopics]).assign(1.0 / numTopics);
+    Vector topicVector = new DenseVector(numTopics).assign(1.0 / numTopics);
     modelTrainer.train(document.get(), topicVector, true, maxIters);
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Mon Jun 17 05:59:13 2013
@@ -155,10 +155,6 @@ public abstract class AbstractJob extend
   protected Path getTempPath(String directory) {
     return new Path(tempPath, directory);
   }
-
-  protected Path getCombinedTempPath(String directory1, String directory2) {
-    return new Path(new Path(tempPath, directory1) + "," + new Path(tempPath, directory2));
-  }
   
   @Override
   public Configuration getConf() {

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java Mon Jun 17 05:59:13 2013
@@ -104,9 +104,10 @@ public final class TestCVBModelTrainer e
     while (numTestTopics < numGeneratingTopics + 2) {
       Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics);
       Configuration conf = getConfiguration();
-      CVB0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
-                     ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
-                     1, 3, 1, false);
+      CVB0Driver cvb0Driver = new CVB0Driver();
+      cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
+          ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
+          1, 3, 1, false);
       perplexities.add(lowestPerplexity(conf, topicModelStateTempPath));
       numTestTopics++;
     }