You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/17 07:59:14 UTC
svn commit: r1493648 - in /mahout/trunk: ./
core/src/main/java/org/apache/mahout/clustering/lda/cvb/
core/src/main/java/org/apache/mahout/common/
core/src/test/java/org/apache/mahout/clustering/lda/cvb/
Author: ssc
Date: Mon Jun 17 05:59:13 2013
New Revision: 1493648
URL: http://svn.apache.org/r1493648
Log:
MAHOUT-1262 Cleanup LDA code
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Jun 17 05:59:13 2013
@@ -2,6 +2,8 @@ Mahout Change Log
Release 0.8 - unreleased
+ MAHOUT-1262: Cleanup LDA code (ssc)
+
MAHOUT-1255: Fix for weights in Multinomial sometimes overflowing in BallKMeans (dfilimon)
MAHOUT-1254: Final round of cleanup for StreamingKMeans (dfilimon)
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0DocInferenceMapper.java Mon Jun 17 05:59:13 2013
@@ -27,18 +27,21 @@ import java.io.IOException;
public class CVB0DocInferenceMapper extends CachingCVB0Mapper {
+ private final VectorWritable topics = new VectorWritable();
+
@Override
public void map(IntWritable docId, VectorWritable doc, Context context)
throws IOException, InterruptedException {
int numTopics = getNumTopics();
- Vector docTopics = new DenseVector(new double[numTopics]).assign(1.0 / numTopics);
+ Vector docTopics = new DenseVector(numTopics).assign(1.0 / numTopics);
Matrix docModel = new SparseRowMatrix(numTopics, doc.get().size());
int maxIters = getMaxIters();
ModelTrainer modelTrainer = getModelTrainer();
for (int i = 0; i < maxIters; i++) {
modelTrainer.getReadModel().trainDocTopicModel(doc.get(), docTopics, docModel);
}
- context.write(docId, new VectorWritable(docTopics));
+ topics.set(docTopics);
+ context.write(docId, topics);
}
@Override
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java Mon Jun 17 05:59:13 2013
@@ -30,9 +30,7 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
@@ -121,36 +119,49 @@ public class CVB0Driver extends Abstract
public static final String BACKFILL_PERPLEXITY = "backfill_perplexity";
private static final String MODEL_PATHS = "mahout.lda.cvb.modelPath";
+ private static final double DEFAULT_CONVERGENCE_DELTA = 0;
+ private static final double DEFAULT_DOC_TOPIC_SMOOTHING = 0.0001;
+ private static final double DEFAULT_TERM_TOPIC_SMOOTHING = 0.0001;
+ private static final int DEFAULT_ITERATION_BLOCK_SIZE = 10;
+ private static final double DEFAULT_TEST_SET_FRACTION = 0;
+ private static final int DEFAULT_NUM_TRAIN_THREADS = 4;
+ private static final int DEFAULT_NUM_UPDATE_THREADS = 1;
+ private static final int DEFAULT_MAX_ITERATIONS_PER_DOC = 10;
+ private static final int DEFAULT_NUM_REDUCE_TASKS = 10;
+
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.maxIterationsOption().create());
- addOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION, "cd", "The convergence delta value", "0");
+ addOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION, "cd", "The convergence delta value",
+ String.valueOf(DEFAULT_CONVERGENCE_DELTA));
addOption(DefaultOptionCreator.overwriteOption().create());
addOption(NUM_TOPICS, "k", "Number of topics to learn", true);
addOption(NUM_TERMS, "nt", "Vocabulary size", false);
- addOption(DOC_TOPIC_SMOOTHING, "a", "Smoothing for document/topic distribution", "0.0001");
- addOption(TERM_TOPIC_SMOOTHING, "e", "Smoothing for topic/term distribution", "0.0001");
- addOption(DICTIONARY, "dict", "Path to term-dictionary file(s) (glob expression supported)",
- false);
- addOption(DOC_TOPIC_OUTPUT, "dt", "Output path for the training doc/topic distribution",
- false);
- addOption(MODEL_TEMP_DIR, "mt", "Path to intermediate model path (useful for restarting)",
- false);
- addOption(ITERATION_BLOCK_SIZE, "block", "Number of iterations per perplexity check", "10");
+ addOption(DOC_TOPIC_SMOOTHING, "a", "Smoothing for document/topic distribution",
+ String.valueOf(DEFAULT_DOC_TOPIC_SMOOTHING));
+ addOption(TERM_TOPIC_SMOOTHING, "e", "Smoothing for topic/term distribution",
+ String.valueOf(DEFAULT_TERM_TOPIC_SMOOTHING));
+ addOption(DICTIONARY, "dict", "Path to term-dictionary file(s) (glob expression supported)", false);
+ addOption(DOC_TOPIC_OUTPUT, "dt", "Output path for the training doc/topic distribution", false);
+ addOption(MODEL_TEMP_DIR, "mt", "Path to intermediate model path (useful for restarting)", false);
+ addOption(ITERATION_BLOCK_SIZE, "block", "Number of iterations per perplexity check",
+ String.valueOf(DEFAULT_ITERATION_BLOCK_SIZE));
addOption(RANDOM_SEED, "seed", "Random seed", false);
- addOption(TEST_SET_FRACTION, "tf", "Fraction of data to hold out for testing", "0");
- addOption(NUM_TRAIN_THREADS, "ntt", "number of threads per mapper to train with", "4");
+ addOption(TEST_SET_FRACTION, "tf", "Fraction of data to hold out for testing",
+ String.valueOf(DEFAULT_TEST_SET_FRACTION));
+ addOption(NUM_TRAIN_THREADS, "ntt", "number of threads per mapper to train with",
+ String.valueOf(DEFAULT_NUM_TRAIN_THREADS));
addOption(NUM_UPDATE_THREADS, "nut", "number of threads per mapper to update the model with",
- "1");
- addOption(MAX_ITERATIONS_PER_DOC, "mipd",
- "max number of iterations per doc for p(topic|doc) learning", "10");
- addOption(NUM_REDUCE_TASKS, null,
- "number of reducers to use during model estimation", "10");
- addOption(buildOption(BACKFILL_PERPLEXITY, null,
- "enable backfilling of missing perplexity values", false, false, null));
+ String.valueOf(DEFAULT_NUM_UPDATE_THREADS));
+ addOption(MAX_ITERATIONS_PER_DOC, "mipd", "max number of iterations per doc for p(topic|doc) learning",
+ String.valueOf(DEFAULT_MAX_ITERATIONS_PER_DOC));
+ addOption(NUM_REDUCE_TASKS, null, "number of reducers to use during model estimation",
+ String.valueOf(DEFAULT_NUM_REDUCE_TASKS));
+ addOption(buildOption(BACKFILL_PERPLEXITY, null, "enable backfilling of missing perplexity values", false, false,
+ null));
if (parseArguments(args) == null) {
return -1;
@@ -204,27 +215,30 @@ public class CVB0Driver extends Abstract
return maxTermId + 1;
}
- public static int run(Configuration conf,
- Path inputPath,
- Path topicModelOutputPath,
- int numTopics,
- int numTerms,
- double alpha,
- double eta,
- int maxIterations,
- int iterationBlockSize,
- double convergenceDelta,
- Path dictionaryPath,
- Path docTopicOutputPath,
- Path topicModelStateTempPath,
- long randomSeed,
- float testFraction,
- int numTrainThreads,
- int numUpdateThreads,
- int maxItersPerDoc,
- int numReduceTasks,
- boolean backfillPerplexity)
+ public int run(Configuration conf,
+ Path inputPath,
+ Path topicModelOutputPath,
+ int numTopics,
+ int numTerms,
+ double alpha,
+ double eta,
+ int maxIterations,
+ int iterationBlockSize,
+ double convergenceDelta,
+ Path dictionaryPath,
+ Path docTopicOutputPath,
+ Path topicModelStateTempPath,
+ long randomSeed,
+ float testFraction,
+ int numTrainThreads,
+ int numUpdateThreads,
+ int maxItersPerDoc,
+ int numReduceTasks,
+ boolean backfillPerplexity)
throws ClassNotFoundException, IOException, InterruptedException {
+
+ setConf(conf);
+
// verify arguments
Preconditions.checkArgument(testFraction >= 0.0 && testFraction <= 1.0,
"Expected 'testFraction' value in range [0, 1] but found value '%s'", testFraction);
@@ -342,23 +356,18 @@ public class CVB0Driver extends Abstract
return Math.abs(perplexities.get(sz - 1) - perplexities.get(sz - 2)) / perplexities.get(0);
}
- private static double calculatePerplexity(Configuration conf, Path corpusPath, Path modelPath, int iteration)
+ private double calculatePerplexity(Configuration conf, Path corpusPath, Path modelPath, int iteration)
throws IOException, ClassNotFoundException, InterruptedException {
String jobName = "Calculating perplexity for " + modelPath;
log.info("About to run: {}", jobName);
- Job job = new Job(conf, jobName);
- job.setJarByClass(CachingCVB0PerplexityMapper.class);
- job.setMapperClass(CachingCVB0PerplexityMapper.class);
+
+ Path outputPath = perplexityPath(modelPath.getParent(), iteration);
+ Job job = prepareJob(corpusPath, outputPath, CachingCVB0PerplexityMapper.class, DoubleWritable.class,
+ DoubleWritable.class, DualDoubleSumReducer.class, DoubleWritable.class, DoubleWritable.class);
+
+ job.setJobName(jobName);
job.setCombinerClass(DualDoubleSumReducer.class);
- job.setReducerClass(DualDoubleSumReducer.class);
job.setNumReduceTasks(1);
- job.setOutputKeyClass(DoubleWritable.class);
- job.setOutputValueClass(DoubleWritable.class);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- FileInputFormat.addInputPath(job, corpusPath);
- Path outputPath = perplexityPath(modelPath.getParent(), iteration);
- FileOutputFormat.setOutputPath(job, outputPath);
setModelPaths(job, modelPath);
HadoopUtil.delete(conf, outputPath);
if (!job.waitForCompletion(true)) {
@@ -422,35 +431,25 @@ public class CVB0Driver extends Abstract
return perplexity / modelWeight;
}
- private static Job writeTopicModel(Configuration conf, Path modelInput, Path output)
+ private Job writeTopicModel(Configuration conf, Path modelInput, Path output)
throws IOException, InterruptedException, ClassNotFoundException {
String jobName = String.format("Writing final topic/term distributions from %s to %s", modelInput, output);
log.info("About to run: {}", jobName);
- Job job = new Job(conf, jobName);
- job.setJarByClass(CVB0Driver.class);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(CVB0TopicTermVectorNormalizerMapper.class);
- job.setNumReduceTasks(0);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(VectorWritable.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- FileInputFormat.addInputPath(job, modelInput);
- FileOutputFormat.setOutputPath(job, output);
+
+ Job job = prepareJob(modelInput, output, SequenceFileInputFormat.class, CVB0TopicTermVectorNormalizerMapper.class,
+ IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, jobName);
job.submit();
return job;
}
- private static Job writeDocTopicInference(Configuration conf, Path corpus, Path modelInput, Path output)
+ private Job writeDocTopicInference(Configuration conf, Path corpus, Path modelInput, Path output)
throws IOException, ClassNotFoundException, InterruptedException {
String jobName = String.format("Writing final document/topic inference from %s to %s", corpus, output);
log.info("About to run: {}", jobName);
- Job job = new Job(conf, jobName);
- job.setMapperClass(CVB0DocInferenceMapper.class);
- job.setNumReduceTasks(0);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(VectorWritable.class);
+
+ Job job = prepareJob(corpus, outputPath, SequenceFileInputFormat.class, CVB0DocInferenceMapper.class,
+ IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, jobName);
+
FileSystem fs = FileSystem.get(corpus.toUri(), conf);
if (modelInput != null && fs.exists(modelInput)) {
FileStatus[] statuses = fs.listStatus(modelInput, PathFilters.partFilter());
@@ -461,9 +460,6 @@ public class CVB0Driver extends Abstract
DistributedCache.setCacheFiles(modelUris, conf);
setModelPaths(job, modelInput);
}
- FileInputFormat.addInputPath(job, corpus);
- FileOutputFormat.setOutputPath(job, output);
- job.setJarByClass(CVB0Driver.class);
job.submit();
return job;
}
@@ -472,10 +468,6 @@ public class CVB0Driver extends Abstract
return new Path(topicModelStateTempPath, "model-" + iterationNumber);
}
- public static Path stage1OutputPath(Path topicModelStateTempPath, int iterationNumber) {
- return new Path(topicModelStateTempPath, "tmp-" + iterationNumber);
- }
-
public static Path perplexityPath(Path topicModelStateTempPath, int iterationNumber) {
return new Path(topicModelStateTempPath, "perplexity-" + iterationNumber);
}
@@ -493,24 +485,17 @@ public class CVB0Driver extends Abstract
return iterationNumber - 1;
}
- public static void runIteration(Configuration conf, Path corpusInput, Path modelInput, Path modelOutput,
- int iterationNumber, int maxIterations, int numReduceTasks)
+ public void runIteration(Configuration conf, Path corpusInput, Path modelInput, Path modelOutput,
+ int iterationNumber, int maxIterations, int numReduceTasks)
throws IOException, ClassNotFoundException, InterruptedException {
String jobName = String.format("Iteration %d of %d, input path: %s",
iterationNumber, maxIterations, modelInput);
log.info("About to run: {}", jobName);
- Job job = new Job(conf, jobName);
- job.setJarByClass(CVB0Driver.class);
- job.setMapperClass(CachingCVB0Mapper.class);
+ Job job = prepareJob(corpusInput, modelOutput, CachingCVB0Mapper.class, IntWritable.class, VectorWritable.class,
+ VectorSumReducer.class, IntWritable.class, VectorWritable.class);
job.setCombinerClass(VectorSumReducer.class);
- job.setReducerClass(VectorSumReducer.class);
job.setNumReduceTasks(numReduceTasks);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(VectorWritable.class);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- FileInputFormat.addInputPath(job, corpusInput);
- FileOutputFormat.setOutputPath(job, modelOutput);
+ job.setJobName(jobName);
setModelPaths(job, modelInput);
HadoopUtil.delete(conf, modelOutput);
if (!job.waitForCompletion(true)) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/cvb/CachingCVB0Mapper.java Mon Jun 17 05:59:13 2013
@@ -60,7 +60,7 @@ public class CachingCVB0Mapper
private ModelTrainer modelTrainer;
private int maxIters;
private int numTopics;
-
+
protected ModelTrainer getModelTrainer() {
return modelTrainer;
}
@@ -112,7 +112,7 @@ public class CachingCVB0Mapper
public void map(IntWritable docId, VectorWritable document, Context context)
throws IOException, InterruptedException {
/* where to get docTopics? */
- Vector topicVector = new DenseVector(new double[numTopics]).assign(1.0 / numTopics);
+ Vector topicVector = new DenseVector(numTopics).assign(1.0 / numTopics);
modelTrainer.train(document.get(), topicVector, true, maxIters);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Mon Jun 17 05:59:13 2013
@@ -155,10 +155,6 @@ public abstract class AbstractJob extend
protected Path getTempPath(String directory) {
return new Path(tempPath, directory);
}
-
- protected Path getCombinedTempPath(String directory1, String directory2) {
- return new Path(new Path(tempPath, directory1) + "," + new Path(tempPath, directory2));
- }
@Override
public Configuration getConf() {
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java?rev=1493648&r1=1493647&r2=1493648&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java Mon Jun 17 05:59:13 2013
@@ -104,9 +104,10 @@ public final class TestCVBModelTrainer e
while (numTestTopics < numGeneratingTopics + 2) {
Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics);
Configuration conf = getConfiguration();
- CVB0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
- ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
- 1, 3, 1, false);
+ CVB0Driver cvb0Driver = new CVB0Driver();
+ cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
+ ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
+ 1, 3, 1, false);
perplexities.add(lowestPerplexity(conf, topicModelStateTempPath));
numTestTopics++;
}