You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/07/07 20:46:20 UTC
svn commit: r961476 [2/2] - in /mahout/trunk:
core/src/test/java/org/apache/mahout/common/
utils/src/main/java/org/apache/mahout/clustering/cdbw/
utils/src/main/java/org/apache/mahout/utils/
utils/src/main/java/org/apache/mahout/utils/clustering/ utils...
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java Wed Jul 7 18:46:19 2010
@@ -22,7 +22,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -34,13 +33,12 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.math.VectorWritable;
@@ -113,6 +111,8 @@ public final class DictionaryVectorizer
* recommend you use a split size of around 400-500MB so that two simultaneous reducers can create
* partial vectors without thrashing the system due to increased swapping
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void createTermFrequencyVectors(Path input,
Path output,
@@ -122,7 +122,7 @@ public final class DictionaryVectorizer
float minLLRValue,
int numReducers,
int chunkSizeInMegabytes,
- boolean sequentialAccess) throws IOException {
+ boolean sequentialAccess) throws IOException, InterruptedException, ClassNotFoundException {
if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
chunkSizeInMegabytes = MIN_CHUNKSIZE;
} else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
@@ -249,6 +249,8 @@ public final class DictionaryVectorizer
* @param numReducers
* the desired number of reducer tasks
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
private static void makePartialVectors(Path input,
int maxNGramSize,
@@ -256,72 +258,73 @@ public final class DictionaryVectorizer
Path output,
int dimension,
boolean sequentialAccess,
- int numReducers) throws IOException {
+ int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
- Configurable client = new JobClient();
- JobConf conf = new JobConf(DictionaryVectorizer.class);
+ Configuration conf = new Configuration();
+ // this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
- // this conf parameter needs to be set enable serialisation of conf values
-
- conf.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: " + input
- + ", dictionary-file: " + dictionaryFilePath.toString());
conf.setInt(PartialVectorMerger.DIMENSION, dimension);
conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
- conf.setInt(MAX_NGRAMS, maxNGramSize);
-
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(StringTuple.class);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(VectorWritable.class);
+ conf.setInt(MAX_NGRAMS, maxNGramSize);
DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, output);
+ Job job = new Job(conf);
+ job.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: " + input
+ + ", dictionary-file: " + dictionaryFilePath.toString());
- conf.setMapperClass(IdentityMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setReducerClass(TFPartialVectorReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setNumReduceTasks(numReducers);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(StringTuple.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(VectorWritable.class);
+ FileInputFormat.setInputPaths(job, input);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setReducerClass(TFPartialVectorReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(numReducers);
HadoopUtil.overwriteOutput(output);
- client.setConf(conf);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
/**
* Count the frequencies of words in parallel using Map/Reduce. The input documents have to be in
* {@link SequenceFile} format
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
- private static void startWordCounting(Path input, Path output, int minSupport) throws IOException {
+ private static void startWordCounting(Path input, Path output, int minSupport) throws IOException, InterruptedException, ClassNotFoundException {
- Configurable client = new JobClient();
- JobConf conf = new JobConf(DictionaryVectorizer.class);
+ Configuration conf = new Configuration();
+ // this conf parameter needs to be set enable serialisation of conf values
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
- // this conf parameter needs to be set enable serialisation of conf values
-
- conf.setJobName("DictionaryVectorizer::WordCount: input-folder: " + input.toString());
conf.setInt(MIN_SUPPORT, minSupport);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(LongWritable.class);
+ Job job = new Job(conf);
+
+ job.setJobName("DictionaryVectorizer::WordCount: input-folder: " + input.toString());
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, output);
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
- conf.setMapperClass(TermCountMapper.class);
+ job.setMapperClass(TermCountMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setCombinerClass(TermCountReducer.class);
- conf.setReducerClass(TermCountReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setCombinerClass(TermCountReducer.class);
+ job.setReducerClass(TermCountReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
HadoopUtil.overwriteOutput(output);
- client.setConf(conf);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java Wed Jul 7 18:46:19 2010
@@ -20,15 +20,14 @@ package org.apache.mahout.utils.vectors.
import java.io.IOException;
import java.nio.charset.Charset;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.StringTuple;
@@ -68,31 +67,31 @@ public final class DocumentProcessor {
* @param analyzerClass
* The Lucene {@link Analyzer} for tokenizing the UTF-8 text
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void tokenizeDocuments(Path input, Class<? extends Analyzer> analyzerClass,
- Path output) throws IOException {
-
- Configurable client = new JobClient();
- JobConf conf = new JobConf(DocumentProcessor.class);
- conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
- + "org.apache.hadoop.io.serializer.WritableSerialization");
+ Path output) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
// this conf parameter needs to be set enable serialisation of conf values
-
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
conf.set(ANALYZER_CLASS, analyzerClass.getName());
- conf.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(StringTuple.class);
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, output);
+ Job job = new Job(conf);
+ job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(StringTuple.class);
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
- conf.setMapperClass(SequenceFileTokenizerMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setNumReduceTasks(0);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setMapperClass(SequenceFileTokenizerMapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
HadoopUtil.overwriteOutput(output);
- client.setConf(conf);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java Wed Jul 7 18:46:19 2010
@@ -21,11 +21,7 @@ import java.io.IOException;
import java.io.StringReader;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.TermAttribute;
@@ -36,13 +32,15 @@ import org.apache.mahout.utils.vectors.t
/**
* Tokenizes a text document and outputs tokens in a StringTuple
*/
-public class SequenceFileTokenizerMapper extends MapReduceBase implements Mapper<Text,Text,Text,StringTuple> {
-
+public class SequenceFileTokenizerMapper extends Mapper<Text, Text, Text, StringTuple> {
+
private Analyzer analyzer;
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+ */
@Override
- public void map(Text key, Text value,
- OutputCollector<Text,StringTuple> output, Reporter reporter) throws IOException {
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(value.toString()));
TermAttribute termAtt = stream.addAttribute(TermAttribute.class);
StringTuple document = new StringTuple();
@@ -51,16 +49,19 @@ public class SequenceFileTokenizerMapper
document.add(new String(termAtt.termBuffer(), 0, termAtt.termLength()));
}
}
- output.collect(key, document);
+ context.write(key, document);
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
try {
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
Class<?> cl = ccl
- .loadClass(job.get(DocumentProcessor.ANALYZER_CLASS, DefaultAnalyzer.class.getName()));
+ .loadClass(context.getConfiguration().get(DocumentProcessor.ANALYZER_CLASS, DefaultAnalyzer.class.getName()));
analyzer = (Analyzer) cl.newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
@@ -70,5 +71,4 @@ public class SequenceFileTokenizerMapper
throw new IllegalStateException(e);
}
}
-
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java Wed Jul 7 18:46:19 2010
@@ -21,17 +21,14 @@ import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.apache.lucene.analysis.tokenattributes.TermAttribute;
import org.apache.mahout.common.StringTuple;
@@ -48,32 +45,32 @@ import org.apache.mahout.utils.vectors.t
/**
* Converts a document in to a sparse vector
*/
-public class TFPartialVectorReducer extends MapReduceBase implements
- Reducer<Text,StringTuple,Text,VectorWritable> {
+public class TFPartialVectorReducer extends Reducer<Text, StringTuple, Text, VectorWritable> {
private final OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<String>();
private int dimension;
+
private boolean sequentialAccess;
-
+
private int maxNGramSize = 1;
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(Text key,
- Iterator<StringTuple> values,
- OutputCollector<Text,VectorWritable> output,
- Reporter reporter) throws IOException {
- if (values.hasNext() == false) {
+ protected void reduce(Text key, Iterable<StringTuple> values, Context context) throws IOException, InterruptedException {
+ Iterator<StringTuple> it = values.iterator();
+ if (it.hasNext() == false) {
return;
}
- StringTuple value = values.next();
-
+ StringTuple value = it.next();
+
Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size
-
+
if (maxNGramSize >= 2) {
- ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()),
- maxNGramSize);
-
+ ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()), maxNGramSize);
+
do {
String term = ((TermAttribute) sf.getAttribute(TermAttribute.class)).term();
if (term.length() > 0) { // ngram
@@ -83,7 +80,7 @@ public class TFPartialVectorReducer exte
}
}
} while (sf.incrementToken());
-
+
sf.end();
sf.close();
} else {
@@ -102,29 +99,33 @@ public class TFPartialVectorReducer exte
// if the vector has no nonZero entries (nothing in the dictionary), let's not waste space sending it to disk.
if (vector.getNumNondefaultElements() > 0) {
VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
- output.collect(key, vectorWritable);
+ context.write(key, vectorWritable);
} else {
- reporter.incrCounter("TFParticalVectorReducer", "emptyVectorCount", 1);
+ context.getCounter("TFParticalVectorReducer", "emptyVectorCount").increment(1);
}
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
try {
- dimension = job.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
- sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
- maxNGramSize = job.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
- URI[] localFiles = DistributedCache.getCacheFiles(job);
+ dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
+ sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+ maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
if (localFiles == null || localFiles.length < 1) {
throw new IllegalArgumentException("missing paths from the DistributedCache");
}
Path dictionaryFile = new Path(localFiles[0].getPath());
- FileSystem fs = dictionaryFile.getFileSystem(job);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, job);
+ FileSystem fs = dictionaryFile.getFileSystem(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, conf);
Text key = new Text();
IntWritable value = new IntWritable();
-
+
// key is word value is id
while (reader.next(key, value)) {
dictionary.put(key.toString(), value.get());
@@ -133,5 +134,5 @@ public class TFPartialVectorReducer exte
throw new IllegalStateException(e);
}
}
-
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java Wed Jul 7 18:46:19 2010
@@ -21,10 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.math.function.ObjectLongProcedure;
import org.apache.mahout.math.map.OpenObjectLongHashMap;
@@ -32,12 +29,13 @@ import org.apache.mahout.math.map.OpenOb
/**
* TextVectorizer Term Count Mapper. Tokenizes a text document and outputs the count of the words
*/
-public class TermCountMapper extends MapReduceBase implements Mapper<Text,StringTuple,Text,LongWritable> {
+public class TermCountMapper extends Mapper<Text, StringTuple, Text, LongWritable> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+ */
@Override
- public void map(Text key,
- StringTuple value,
- final OutputCollector<Text,LongWritable> output,
- final Reporter reporter) throws IOException {
+ protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();
for (String word : value.getEntries()) {
if (wordCount.containsKey(word)) {
@@ -50,10 +48,12 @@ public class TermCountMapper extends Map
@Override
public boolean apply(String first, long second) {
try {
- output.collect(new Text(first), new LongWritable(second));
+ context.write(new Text(first), new LongWritable(second));
} catch (IOException e) {
- reporter.incrCounter("Exception", "Output IO Exception", 1);
- }
+ context.getCounter("Exception", "Output IO Exception").increment(1);
+ } catch (InterruptedException e) {
+ context.getCounter("Exception", "Interrupted Exception").increment(1);
+ }
return true;
}
});
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java Wed Jul 7 18:46:19 2010
@@ -22,38 +22,38 @@ import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.utils.vectors.text.DictionaryVectorizer;
/**
* Can also be used as a local Combiner. This accumulates all the words and the weights and sums them up.
*/
-public class TermCountReducer extends MapReduceBase implements Reducer<Text,LongWritable,Text,LongWritable> {
-
+public class TermCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+
private int minSupport;
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(Text key,
- Iterator<LongWritable> values,
- OutputCollector<Text,LongWritable> output,
- Reporter reporter) throws IOException {
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
+ Iterator<LongWritable> it = values.iterator();
+ while (it.hasNext()) {
+ sum += it.next().get();
}
if (sum >= minSupport) {
- output.collect(key, new LongWritable(sum));
+ context.write(key, new LongWritable(sum));
}
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
- minSupport = job.getInt(DictionaryVectorizer.MIN_SUPPORT,
- DictionaryVectorizer.DEFAULT_MIN_SUPPORT);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ minSupport = context.getConfiguration().getInt(DictionaryVectorizer.MIN_SUPPORT, DictionaryVectorizer.DEFAULT_MIN_SUPPORT);
}
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java Wed Jul 7 18:46:19 2010
@@ -23,36 +23,33 @@ import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
/**
* TextVectorizer Document Frequency Count Mapper. Outputs 1 for each feature
*
*/
-public class TermDocumentCountMapper extends MapReduceBase implements
- Mapper<WritableComparable<?>,VectorWritable,IntWritable,LongWritable> {
-
+public class TermDocumentCountMapper extends Mapper<WritableComparable<?>, VectorWritable, IntWritable, LongWritable> {
+
private static final LongWritable ONE = new LongWritable(1);
+
private static final IntWritable TOTAL_COUNT = new IntWritable(-1);
-
- @Override
- public void map(WritableComparable<?> key,
- VectorWritable value,
- OutputCollector<IntWritable,LongWritable> output,
- Reporter reporter) throws IOException {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+ protected void map(WritableComparable<?> key, VectorWritable value, Context context)
+ throws IOException, InterruptedException {
Vector vector = value.get();
Iterator<Element> it = vector.iterateNonZero();
-
+
while (it.hasNext()) {
Element e = it.next();
- output.collect(new IntWritable(e.index()), ONE);
+ context.write(new IntWritable(e.index()), ONE);
}
- output.collect(TOTAL_COUNT, ONE);
+ context.write(TOTAL_COUNT, ONE);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java Wed Jul 7 18:46:19 2010
@@ -22,26 +22,24 @@ import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
/**
* Can also be used as a local Combiner. This accumulates all the features and the weights and sums them up.
*/
-public class TermDocumentCountReducer extends MapReduceBase implements
- Reducer<IntWritable,LongWritable,IntWritable,LongWritable> {
-
+public class TermDocumentCountReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(IntWritable key,
- Iterator<LongWritable> values,
- OutputCollector<IntWritable,LongWritable> output,
- Reporter reporter) throws IOException {
+ protected void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
+ Iterator<LongWritable> it = values.iterator();
+ while (it.hasNext()) {
+ sum += it.next().get();
}
- output.collect(key, new LongWritable(sum));
+ context.write(key, new LongWritable(sum));
}
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java Wed Jul 7 18:46:19 2010
@@ -22,7 +22,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -32,13 +31,12 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.math.VectorWritable;
@@ -55,40 +53,40 @@ import org.apache.mahout.utils.vectors.t
*
*/
public final class TFIDFConverter {
-
+
public static final String VECTOR_COUNT = "vector.count";
-
+
public static final String FEATURE_COUNT = "feature.count";
-
+
public static final String MIN_DF = "min.df";
-
+
public static final String MAX_DF_PERCENTAGE = "max.df.percentage";
-
+
public static final String TFIDF_OUTPUT_FOLDER = "tfidf";
-
+
private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tfidf-vectors";
-
+
private static final String FREQUENCY_FILE = "frequency.file-";
-
+
private static final int MAX_CHUNKSIZE = 10000;
-
+
private static final int MIN_CHUNKSIZE = 100;
-
+
private static final String OUTPUT_FILES_PATTERN = "part-*";
-
+
private static final int SEQUENCEFILE_BYTE_OVERHEAD = 45;
-
+
private static final String VECTOR_OUTPUT_FOLDER = "partial-vectors-";
-
+
private static final String WORDCOUNT_OUTPUT_FOLDER = "df-count";
-
+
/**
* Cannot be initialized. Use the static functions
*/
private TFIDFConverter() {
}
-
+
/**
* Create Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors from the input set of vectors in
* {@link SequenceFile} format. This job uses a fixed limit on the maximum memory used by the feature chunk
@@ -115,6 +113,8 @@ public final class TFIDFConverter {
* will typically produce a single output file containing tf-idf vectors for a subset of the
* documents in the corpus.
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void processTfIdf(Path input,
Path output,
@@ -122,48 +122,57 @@ public final class TFIDFConverter {
int minDf,
int maxDFPercent,
float normPower,
- boolean sequentialAccessOutput,
- int numReducers) throws IOException {
+ boolean sequentialAccessOutput,
+ int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
chunkSizeInMegabytes = MIN_CHUNKSIZE;
} else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
chunkSizeInMegabytes = MAX_CHUNKSIZE;
}
-
+
if (normPower != PartialVectorMerger.NO_NORMALIZING && normPower < 0) {
throw new IllegalArgumentException("normPower must either be -1 or >= 0");
}
-
+
if (minDf < 1) {
minDf = 1;
}
if (maxDFPercent < 0 || maxDFPercent > 100) {
maxDFPercent = 99;
}
-
+
Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);
-
+
startDFCounting(input, wordCountPath);
- Pair<Long[],List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output,
- chunkSizeInMegabytes);
-
+ Pair<Long[], List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output, chunkSizeInMegabytes);
+
int partialVectorIndex = 0;
List<Path> partialVectorPaths = new ArrayList<Path>();
List<Path> dictionaryChunks = datasetFeatures.getSecond();
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
- makePartialVectors(input, datasetFeatures.getFirst()[0], datasetFeatures.getFirst()[1],
- minDf, maxDFPercent, dictionaryChunk, partialVectorOutputPath, sequentialAccessOutput);
+ makePartialVectors(input,
+ datasetFeatures.getFirst()[0],
+ datasetFeatures.getFirst()[1],
+ minDf,
+ maxDFPercent,
+ dictionaryChunk,
+ partialVectorOutputPath,
+ sequentialAccessOutput);
}
-
+
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
-
+
Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
if (dictionaryChunks.size() > 1) {
- PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, normPower,
- datasetFeatures.getFirst()[0].intValue(), sequentialAccessOutput, numReducers);
+ PartialVectorMerger.mergePartialVectors(partialVectorPaths,
+ outputDir,
+ normPower,
+ datasetFeatures.getFirst()[0].intValue(),
+ sequentialAccessOutput,
+ numReducers);
HadoopUtil.deletePaths(partialVectorPaths, fs);
} else {
Path singlePartialVectorOutputPath = partialVectorPaths.get(0);
@@ -171,7 +180,7 @@ public final class TFIDFConverter {
fs.rename(singlePartialVectorOutputPath, outputDir);
}
}
-
+
/**
* Read the document frequency List which is built at the end of the DF Count Job. This will use constant
* memory and will run at the speed of your disk read
@@ -180,25 +189,24 @@ public final class TFIDFConverter {
* @param dictionaryPathBase
* @throws IOException
*/
- private static Pair<Long[],List<Path>> createDictionaryChunks(Path featureCountPath,
- Path dictionaryPathBase,
- int chunkSizeInMegabytes) throws IOException {
+ private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
+ Path dictionaryPathBase,
+ int chunkSizeInMegabytes) throws IOException {
List<Path> chunkPaths = new ArrayList<Path>();
-
+
IntWritable key = new IntWritable();
LongWritable value = new LongWritable();
Configuration conf = new Configuration();
-
+
FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
FileStatus[] outputFiles = fs.globStatus(new Path(featureCountPath, OUTPUT_FILES_PATTERN));
-
+
long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
int chunkIndex = 0;
Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);
- SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class,
- LongWritable.class);
-
+ SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
+
long currentChunkSize = 0;
long featureCount = 0;
long vectorCount = Long.MAX_VALUE;
@@ -210,14 +218,14 @@ public final class TFIDFConverter {
if (currentChunkSize > chunkSizeLimit) {
freqWriter.close();
chunkIndex++;
-
+
chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);
-
+
freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
currentChunkSize = 0;
}
-
+
int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;
currentChunkSize += fieldSize;
if (key.get() >= 0) {
@@ -226,15 +234,15 @@ public final class TFIDFConverter {
vectorCount = value.get();
}
featureCount = Math.max(key.get(), featureCount);
-
+
}
}
featureCount++;
freqWriter.close();
- Long[] counts = {featureCount, vectorCount};
- return new Pair<Long[],List<Path>>(counts, chunkPaths);
+ Long[] counts = { featureCount, vectorCount };
+ return new Pair<Long[], List<Path>>(counts, chunkPaths);
}
-
+
/**
* Create a partial tfidf vector using a chunk of features from the input vectors. The input vectors has to
* be in the {@link SequenceFile} format
@@ -255,6 +263,8 @@ public final class TFIDFConverter {
* @param output
* output directory were the partial vectors have to be created
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
private static void makePartialVectors(Path input,
Long featureCount,
@@ -263,68 +273,67 @@ public final class TFIDFConverter {
int maxDFPercent,
Path dictionaryFilePath,
Path output,
- boolean sequentialAccess) throws IOException {
-
- Configurable client = new JobClient();
- JobConf conf = new JobConf(TFIDFConverter.class);
- conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
- + "org.apache.hadoop.io.serializer.WritableSerialization");
+ boolean sequentialAccess) throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
// this conf parameter needs to be set enable serialisation of conf values
-
- conf.setJobName(": MakePartialVectors: input-folder: " + input + ", dictionary-file: "
- + dictionaryFilePath.toString());
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
conf.setLong(FEATURE_COUNT, featureCount);
conf.setLong(VECTOR_COUNT, vectorCount);
conf.setInt(MIN_DF, minDf);
conf.setInt(MAX_DF_PERCENTAGE, maxDFPercent);
conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(VectorWritable.class);
- DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
- FileInputFormat.setInputPaths(conf, input);
-
- FileOutputFormat.setOutputPath(conf, output);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setReducerClass(TFIDFPartialVectorReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ DistributedCache.setCacheFiles(new URI[] { dictionaryFilePath.toUri() }, conf);
+
+ Job job = new Job(conf);
+ job.setJobName(": MakePartialVectors: input-folder: " + input + ", dictionary-file: " + dictionaryFilePath.toString());
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(VectorWritable.class);
+ FileInputFormat.setInputPaths(job, input);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setReducerClass(TFIDFPartialVectorReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
HadoopUtil.overwriteOutput(output);
-
- client.setConf(conf);
- JobClient.runJob(conf);
+
+ job.waitForCompletion(true);
}
-
+
/**
* Count the document frequencies of features in parallel using Map/Reduce. The input documents have to be
* in {@link SequenceFile} format
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
- private static void startDFCounting(Path input, Path output) throws IOException {
-
- Configurable client = new JobClient();
- JobConf conf = new JobConf(TFIDFConverter.class);
- conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
- + "org.apache.hadoop.io.serializer.WritableSerialization");
+ private static void startDFCounting(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
// this conf parameter needs to be set enable serialisation of conf values
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
- conf.setJobName("VectorTfIdf Document Frequency Count running over input: " + input.toString());
- conf.setOutputKeyClass(IntWritable.class);
- conf.setOutputValueClass(LongWritable.class);
-
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, output);
-
- conf.setMapperClass(TermDocumentCountMapper.class);
-
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setCombinerClass(TermDocumentCountReducer.class);
- conf.setReducerClass(TermDocumentCountReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
-
+ Job job = new Job(conf);
+ job.setJobName("VectorTfIdf Document Frequency Count running over input: " + input.toString());
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(TermDocumentCountMapper.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setCombinerClass(TermDocumentCountReducer.class);
+ job.setReducerClass(TermDocumentCountReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
HadoopUtil.overwriteOutput(output);
- client.setConf(conf);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java Wed Jul 7 18:46:19 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,11 +29,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -46,30 +43,38 @@ import org.apache.mahout.utils.vectors.c
/**
* Converts a document in to a sparse vector
*/
-public class TFIDFPartialVectorReducer extends MapReduceBase implements
- Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
-
+public class TFIDFPartialVectorReducer extends
+ Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
private final OpenIntLongHashMap dictionary = new OpenIntLongHashMap();
+
private final TFIDF tfidf = new TFIDF();
+
private int minDf = 1;
+
private int maxDfPercent = 99;
+
private long vectorCount = 1;
+
private long featureCount;
+
private boolean sequentialAccess;
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(WritableComparable<?> key,
- Iterator<VectorWritable> values,
- OutputCollector<WritableComparable<?>,VectorWritable> output,
- Reporter reporter) throws IOException {
- if (!values.hasNext()) {
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context) throws IOException,
+ InterruptedException {
+ Iterator<VectorWritable> it = values.iterator();
+ if (!it.hasNext()) {
return;
}
- Vector value = values.next().get();
- Iterator<Element> it = value.iterateNonZero();
+ Vector value = it.next().get();
+ Iterator<Element> it1 = value.iterateNonZero();
Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
- while (it.hasNext()) {
- Element e = it.next();
+ while (it1.hasNext()) {
+ Element e = it1.next();
if (!dictionary.containsKey(e.index())) {
continue;
}
@@ -80,38 +85,40 @@ public class TFIDFPartialVectorReducer e
if (df < minDf) {
df = minDf;
}
- vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount,
- (int) vectorCount));
+ vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
}
if (sequentialAccess) {
vector = new SequentialAccessSparseVector(vector);
}
VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
- output.collect(key, vectorWritable);
+ context.write(key, vectorWritable);
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
try {
-
- URI[] localFiles = DistributedCache.getCacheFiles(job);
+ Configuration conf = context.getConfiguration();
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
if (localFiles == null || localFiles.length < 1) {
throw new IllegalArgumentException("missing paths from the DistributedCache");
}
-
- vectorCount = job.getLong(TFIDFConverter.VECTOR_COUNT, 1);
- featureCount = job.getLong(TFIDFConverter.FEATURE_COUNT, 1);
- minDf = job.getInt(TFIDFConverter.MIN_DF, 1);
- maxDfPercent = job.getInt(TFIDFConverter.MAX_DF_PERCENTAGE, 99);
- sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
-
+
+ vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
+ featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
+ minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
+ maxDfPercent = conf.getInt(TFIDFConverter.MAX_DF_PERCENTAGE, 99);
+ sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+
Path dictionaryFile = new Path(localFiles[0].getPath());
- FileSystem fs = dictionaryFile.getFileSystem(job);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, job);
+ FileSystem fs = dictionaryFile.getFileSystem(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, conf);
IntWritable key = new IntWritable();
LongWritable value = new LongWritable();
-
+
// key is feature, value is the document frequency
while (reader.next(key, value)) {
dictionary.put(key.get(), value.get());
@@ -120,4 +127,5 @@ public class TFIDFPartialVectorReducer e
throw new IllegalStateException(e);
}
}
+
}
Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java Wed Jul 7 18:46:19 2010
@@ -17,39 +17,32 @@
package org.apache.mahout.utils.nlp.collocations.llr;
+import static org.junit.Assert.assertEquals;
+
import java.io.Reader;
import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.util.Version;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.DummyRecordWriter;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.utils.nlp.collocations.llr.Gram.Type;
-import org.easymock.EasyMock;
-import org.junit.Before;
import org.junit.Test;
/**
* Test for CollocMapper
*/
-@SuppressWarnings("deprecation")
public class CollocMapperTest {
private OutputCollector<GramKey,Gram> collector;
- private Reporter reporter;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() {
- collector = EasyMock.createMock(OutputCollector.class);
- reporter = EasyMock.createMock(Reporter.class);
- }
-
@Test
public void testCollectNgrams() throws Exception {
@@ -88,22 +81,23 @@ public class CollocMapperTest {
GramKey subgramKey = new GramKey(subgram, new byte[0]);
GramKey subgramNgramKey = new GramKey(subgram, ngram.getBytes());
+ collector = new DummyOutputCollector<GramKey, Gram>();
collector.collect(subgramKey, subgram);
collector.collect(subgramNgramKey, ngram);
}
- reporter.incrCounter(CollocMapper.Count.NGRAM_TOTAL, 7);
- EasyMock.replay(reporter, collector);
-
- JobConf conf = new JobConf();
+ Configuration conf = new Configuration();
conf.set(CollocMapper.MAX_SHINGLE_SIZE, "2");
- CollocMapper c = new CollocMapper();
- c.configure(conf);
+ CollocMapper mapper = new CollocMapper();
+ DummyRecordWriter<GramKey, Gram> writer = new DummyRecordWriter<GramKey, Gram>();
+ Mapper<Text, StringTuple, GramKey, Gram>.Context context = DummyRecordWriter.build(mapper, conf, writer);
+ mapper.setup(context);
- c.map(key, inputTuple, collector, reporter);
+ mapper.map(key, inputTuple, context);
- EasyMock.verify(reporter, collector);
+ Counter counter = (Counter) context.getCounter(CollocMapper.Count.NGRAM_TOTAL);
+ assertEquals("counter", 7, counter.getValue());
}
@Test
@@ -145,7 +139,7 @@ public class CollocMapperTest {
frequency = 2;
}
-
+ collector = new DummyOutputCollector<GramKey, Gram>();
if (p == Gram.Type.UNIGRAM) {
Gram unigram = new Gram(v[1], frequency, Gram.Type.UNIGRAM);
@@ -163,19 +157,19 @@ public class CollocMapperTest {
}
}
- reporter.incrCounter(CollocMapper.Count.NGRAM_TOTAL, 7);
- EasyMock.replay(reporter, collector);
-
- JobConf conf = new JobConf();
+ Configuration conf = new Configuration();
conf.set(CollocMapper.MAX_SHINGLE_SIZE, "2");
conf.setBoolean(CollocDriver.EMIT_UNIGRAMS, true);
- CollocMapper c = new CollocMapper();
- c.configure(conf);
+ CollocMapper mapper = new CollocMapper();
+ DummyRecordWriter<GramKey, Gram> writer = new DummyRecordWriter<GramKey, Gram>();
+ Mapper<Text, StringTuple, GramKey, Gram>.Context context = DummyRecordWriter.build(mapper, conf, writer);
+ mapper.setup(context);
- c.map(key, inputTuple, collector, reporter);
+ mapper.map(key, inputTuple, context);
- EasyMock.verify(reporter, collector);
+ Counter counter = (Counter) context.getCounter(CollocMapper.Count.NGRAM_TOTAL);
+ assertEquals("counter", 7, counter.getValue());
}
/** A lucene 2.9 standard analyzer with no stopwords. */
Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java Wed Jul 7 18:46:19 2010
@@ -18,74 +18,74 @@
package org.apache.mahout.utils.nlp.collocations.llr;
import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.HEAD;
+import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.UNIGRAM;
-import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.easymock.EasyMock;
-import org.junit.Before;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
import org.junit.Test;
/**
* Test the CollocReducer
*/
public class CollocReducerTest {
-
- private OutputCollector<Gram,Gram> output;
- private Reporter reporter;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() {
- output = EasyMock.createMock(OutputCollector.class);
- reporter = EasyMock.createMock(Reporter.class);
- }
-
+
@Test
public void testReduce() throws Exception {
// test input, input[*][0] is the key,
// input[*][1..n] are the values passed in via
// the iterator.
- Gram[][] input = {
- {new Gram("the", UNIGRAM), new Gram("the", UNIGRAM), new Gram("the", UNIGRAM)},
- {new Gram("the", HEAD), new Gram("the best", NGRAM), new Gram("the worst", NGRAM)},
- {new Gram("of", HEAD), new Gram("of times", NGRAM), new Gram("of times", NGRAM)},
- {new Gram("times", TAIL), new Gram("of times", NGRAM), new Gram("of times", NGRAM)}};
-
+ Gram[][] input = { { new Gram("the", UNIGRAM), new Gram("the", UNIGRAM), new Gram("the", UNIGRAM) },
+ { new Gram("the", HEAD), new Gram("the best", NGRAM), new Gram("the worst", NGRAM) },
+ { new Gram("of", HEAD), new Gram("of times", NGRAM), new Gram("of times", NGRAM) },
+ { new Gram("times", TAIL), new Gram("of times", NGRAM), new Gram("of times", NGRAM) } };
+
// expected results.
- Gram[][] values = {{new Gram("the", 2, UNIGRAM), new Gram("the", 2, UNIGRAM)},
- {new Gram("the best", 1, NGRAM), new Gram("the", 2, HEAD)},
- {new Gram("the worst", 1, NGRAM), new Gram("the", 2, HEAD)},
- {new Gram("of times", 2, NGRAM), new Gram("of", 2, HEAD)},
- {new Gram("of times", 2, NGRAM), new Gram("times", 2, TAIL)}};
-
- // set up expectations
- for (Gram[] v : values) {
- output.collect(v[0], v[1]);
+ Gram[][] values = { { new Gram("the", 2, UNIGRAM), new Gram("the", 2, UNIGRAM) },
+ { new Gram("the best", 1, NGRAM), new Gram("the", 2, HEAD) },
+ { new Gram("the worst", 1, NGRAM), new Gram("the", 2, HEAD) },
+ { new Gram("of times", 2, NGRAM), new Gram("of", 2, HEAD) },
+ { new Gram("of times", 2, NGRAM), new Gram("times", 2, TAIL) } };
+
+ Map<Gram, List<Gram>> reference = new HashMap<Gram, List<Gram>>();
+ for (Gram[] grams : values) {
+ List<Gram> list = reference.get(grams[0]);
+ if (list == null) {
+ list = new ArrayList<Gram>();
+ reference.put(grams[0], list);
+ }
+ for (int j = 1; j < grams.length; j++)
+ list.add(grams[j]);
}
- EasyMock.replay(reporter, output);
-
- // play back the input data.
- CollocReducer c = new CollocReducer();
-
+
+ // reduce the input data.
+ Configuration conf = new Configuration();
+ CollocReducer reducer = new CollocReducer();
+ DummyRecordWriter<Gram, Gram> writer = new DummyRecordWriter<Gram, Gram>();
+ Reducer<GramKey, Gram, Gram, Gram>.Context context = DummyRecordWriter.build(reducer, conf, writer, GramKey.class, Gram.class);
+
GramKey key = new GramKey();
byte[] empty = new byte[0];
for (Gram[] ii : input) {
key.set(ii[0], empty);
-
List<Gram> vv = new LinkedList<Gram>();
vv.addAll(Arrays.asList(ii));
- c.reduce(key, vv.iterator(), output, reporter);
+ reducer.reduce(key, vv, context);
}
-
- EasyMock.verify(reporter, output);
+ assertTrue(writer.getKeys().size() == reference.keySet().size());
+ for (Gram gram : reference.keySet())
+ assertEquals("Gram " + gram, reference.get(gram).size(), writer.getValue(gram).size());
}
-
}
Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java Wed Jul 7 18:46:19 2010
@@ -18,18 +18,18 @@
package org.apache.mahout.utils.nlp.collocations.llr;
import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.HEAD;
-import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
+import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
import org.apache.mahout.math.stats.LogLikelihood;
import org.apache.mahout.utils.nlp.collocations.llr.LLRReducer.LLCallback;
import org.easymock.EasyMock;
@@ -41,29 +41,16 @@ import org.slf4j.LoggerFactory;
/** Test the LLRReducer
* FIXME: Add negative test cases.
*/
-@SuppressWarnings("deprecation")
public class LLRReducerTest {
private static final Logger log =
LoggerFactory.getLogger(LLRReducerTest.class);
- private Reporter reporter;
private LLCallback ll;
private LLCallback cl;
- // not verifying the llr algo output here, just the input, but it is handy
- // to see the values emitted.
- private final OutputCollector<Text,DoubleWritable> collector = new OutputCollector<Text,DoubleWritable>() {
- @Override
- public void collect(Text key, DoubleWritable value) {
- log.info("{} {}", key, value);
- }
- };
-
-
@Before
public void setUp() {
- reporter = EasyMock.createMock(Reporter.class);
ll = EasyMock.createMock(LLCallback.class);
cl = new LLCallback() {
@Override
@@ -76,7 +63,6 @@ public class LLRReducerTest {
@Test
public void testReduce() throws Exception {
- LLRReducer reducer = new LLRReducer(ll);
// test input, input[*][0] is the key,
// input[*][1..n] are the values passed in via
@@ -108,14 +94,21 @@ public class LLRReducerTest {
EasyMock.replay(ll);
- JobConf config = new JobConf(CollocDriver.class);
- config.set(LLRReducer.NGRAM_TOTAL, "7");
- reducer.configure(config);
+ Configuration conf = new Configuration();
+ conf.set(LLRReducer.NGRAM_TOTAL, "7");
+ LLRReducer reducer = new LLRReducer(ll);
+ DummyRecordWriter<Text, DoubleWritable> writer = new DummyRecordWriter<Text, DoubleWritable>();
+ Reducer<Gram, Gram, Text, DoubleWritable>.Context context = DummyRecordWriter.build(reducer,
+ conf,
+ writer,
+ Gram.class,
+ Gram.class);
+ reducer.setup(context);
for (Gram[] ii: input) {
List<Gram> vv = new LinkedList<Gram>();
vv.addAll(Arrays.asList(ii).subList(1, ii.length));
- reducer.reduce(ii[0], vv.iterator(), collector, reporter);
+ reducer.reduce(ii[0], vv, context);
}
EasyMock.verify(ll);
Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java Wed Jul 7 18:46:19 2010
@@ -18,7 +18,6 @@
package org.apache.mahout.utils.vectors.text;
import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -36,44 +35,40 @@ import org.apache.mahout.utils.vectors.t
* Test the dictionary Vector
*/
public class DictionaryVectorizerTest extends MahoutTestCase {
-
+
public static final int AVG_DOCUMENT_LENGTH = 20;
-
+
public static final int AVG_SENTENCE_LENGTH = 8;
-
+
public static final int AVG_WORD_LENGTH = 6;
-
+
public static final int NUM_DOCS = 100;
-
+
public static final String CHARSET = "abcdef";
-
+
public static final String DELIM = " .,?;:!\t\n\r";
-
- public static final String ERRORSET = "`1234567890"
- + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
-
+
+ public static final String ERRORSET = "`1234567890" + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
+
private static final Random random = RandomUtils.getRandom();
-
+
private FileSystem fs;
-
+
private static char getRandomDelimiter() {
return DELIM.charAt(random.nextInt(DictionaryVectorizerTest.DELIM.length()));
}
-
+
public static String getRandomDocument() {
- int length = (AVG_DOCUMENT_LENGTH >> 1)
- + DictionaryVectorizerTest.random.nextInt(AVG_DOCUMENT_LENGTH);
- StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH
- * AVG_WORD_LENGTH);
+ int length = (AVG_DOCUMENT_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_DOCUMENT_LENGTH);
+ StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH * AVG_WORD_LENGTH);
for (int i = 0; i < length; i++) {
sb.append(getRandomSentence());
}
return sb.toString();
}
-
+
public static String getRandomSentence() {
- int length = (AVG_SENTENCE_LENGTH >> 1)
- + DictionaryVectorizerTest.random.nextInt(AVG_SENTENCE_LENGTH);
+ int length = (AVG_SENTENCE_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_SENTENCE_LENGTH);
StringBuilder sb = new StringBuilder(length * AVG_WORD_LENGTH);
for (int i = 0; i < length; i++) {
sb.append(getRandomString()).append(' ');
@@ -81,45 +76,56 @@ public class DictionaryVectorizerTest ex
sb.append(getRandomDelimiter());
return sb.toString();
}
-
+
public static String getRandomString() {
int length = (AVG_WORD_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_WORD_LENGTH);
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
- sb.append(DictionaryVectorizerTest.CHARSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.CHARSET.length())));
+ sb.append(DictionaryVectorizerTest.CHARSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.CHARSET
+ .length())));
}
if (DictionaryVectorizerTest.random.nextInt(10) == 0) {
- sb.append(DictionaryVectorizerTest.ERRORSET.charAt(DictionaryVectorizerTest.random
- .nextInt(DictionaryVectorizerTest.ERRORSET.length())));
+ sb.append(DictionaryVectorizerTest.ERRORSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.ERRORSET
+ .length())));
}
return sb.toString();
}
-
+
@Override
public void setUp() throws Exception {
super.setUp();
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
}
-
- public void testCreateTermFrequencyVectors() throws IOException {
+
+ public void testCreateTermFrequencyVectors() throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Path path = getTestTempFilePath("documents/docs.file");
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
- Text.class, Text.class);
-
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Text.class);
+
for (int i = 0; i < NUM_DOCS; i++) {
- writer.append(new Text("Document::ID::" + i), new Text(
- getRandomDocument()));
+ writer.append(new Text("Document::ID::" + i), new Text(getRandomDocument()));
}
writer.close();
Class<? extends Analyzer> analyzer = DefaultAnalyzer.class;
- DocumentProcessor.tokenizeDocuments(path, analyzer,
- getTestTempDirPath("output/tokenized-documents"));
+ DocumentProcessor.tokenizeDocuments(path, analyzer, getTestTempDirPath("output/tokenized-documents"));
DictionaryVectorizer.createTermFrequencyVectors(getTestTempDirPath("output/tokenized-documents"),
- getTestTempDirPath("output/wordcount"), conf, 2, 1, 0.0f, 1, 100, false);
- TFIDFConverter.processTfIdf(getTestTempDirPath("output/wordcount/vectors"),
- getTestTempDirPath("output/tfidf"), 100, 1, 99, 1.0f, false, 1);
-
+ getTestTempDirPath("output/wordcount"),
+ conf,
+ 2,
+ 1,
+ 0.0f,
+ 1,
+ 100,
+ false);
+ TFIDFConverter.processTfIdf(getTestTempDirPath("output/wordcount/tf-vectors"),
+ getTestTempDirPath("output/tfidf"),
+ 100,
+ 1,
+ 99,
+ 1.0f,
+ false,
+ 1);
+
}
}