You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/07/07 20:46:20 UTC

svn commit: r961476 [2/2] - in /mahout/trunk: core/src/test/java/org/apache/mahout/common/ utils/src/main/java/org/apache/mahout/clustering/cdbw/ utils/src/main/java/org/apache/mahout/utils/ utils/src/main/java/org/apache/mahout/utils/clustering/ utils...

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java Wed Jul  7 18:46:19 2010
@@ -22,7 +22,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -34,13 +33,12 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.math.VectorWritable;
@@ -113,6 +111,8 @@ public final class DictionaryVectorizer 
    *          recommend you use a split size of around 400-500MB so that two simultaneous reducers can create
    *          partial vectors without thrashing the system due to increased swapping
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void createTermFrequencyVectors(Path input,
                                                 Path output,
@@ -122,7 +122,7 @@ public final class DictionaryVectorizer 
                                                 float minLLRValue,
                                                 int numReducers,
                                                 int chunkSizeInMegabytes,
-                                                boolean sequentialAccess) throws IOException {
+                                                boolean sequentialAccess) throws IOException, InterruptedException, ClassNotFoundException {
     if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
       chunkSizeInMegabytes = MIN_CHUNKSIZE;
     } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
@@ -249,6 +249,8 @@ public final class DictionaryVectorizer 
    * @param numReducers 
    *          the desired number of reducer tasks
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   private static void makePartialVectors(Path input,
                                          int maxNGramSize,
@@ -256,72 +258,73 @@ public final class DictionaryVectorizer 
                                          Path output,
                                          int dimension,
                                          boolean sequentialAccess, 
-                                         int numReducers) throws IOException {
+                                         int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
     
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(DictionaryVectorizer.class);
+    Configuration conf = new Configuration();
+    // this conf parameter needs to be set enable serialisation of conf values
     conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
                                   + "org.apache.hadoop.io.serializer.WritableSerialization");
-    // this conf parameter needs to be set enable serialisation of conf values
-    
-    conf.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: " + input
-                    + ", dictionary-file: " + dictionaryFilePath.toString());
     conf.setInt(PartialVectorMerger.DIMENSION, dimension);
     conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
-    conf.setInt(MAX_NGRAMS, maxNGramSize);
-    
-    conf.setMapOutputKeyClass(Text.class);
-    conf.setMapOutputValueClass(StringTuple.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(VectorWritable.class);
+    conf.setInt(MAX_NGRAMS, maxNGramSize);   
     DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
-    FileInputFormat.setInputPaths(conf, input);
     
-    FileOutputFormat.setOutputPath(conf, output);
+    Job job = new Job(conf);
+    job.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: " + input
+                    + ", dictionary-file: " + dictionaryFilePath.toString());
     
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setReducerClass(TFPartialVectorReducer.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    conf.setNumReduceTasks(numReducers);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(StringTuple.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VectorWritable.class);
+    FileInputFormat.setInputPaths(job, input);
+    
+    FileOutputFormat.setOutputPath(job, output);
+    
+    job.setMapperClass(Mapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setReducerClass(TFPartialVectorReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(numReducers);
 
     HadoopUtil.overwriteOutput(output);
     
-    client.setConf(conf);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
   
   /**
    * Count the frequencies of words in parallel using Map/Reduce. The input documents have to be in
    * {@link SequenceFile} format
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
-  private static void startWordCounting(Path input, Path output, int minSupport) throws IOException {
+  private static void startWordCounting(Path input, Path output, int minSupport) throws IOException, InterruptedException, ClassNotFoundException {
     
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(DictionaryVectorizer.class);
+    Configuration conf = new Configuration();
+    // this conf parameter needs to be set enable serialisation of conf values
     conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
                                   + "org.apache.hadoop.io.serializer.WritableSerialization");
-    // this conf parameter needs to be set enable serialisation of conf values
-    
-    conf.setJobName("DictionaryVectorizer::WordCount: input-folder: " + input.toString());
     conf.setInt(MIN_SUPPORT, minSupport);
     
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(LongWritable.class);
+    Job job = new Job(conf);
+    
+    job.setJobName("DictionaryVectorizer::WordCount: input-folder: " + input.toString());
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
     
-    FileInputFormat.setInputPaths(conf, input);
-    FileOutputFormat.setOutputPath(conf, output);
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
     
-    conf.setMapperClass(TermCountMapper.class);
+    job.setMapperClass(TermCountMapper.class);
     
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setCombinerClass(TermCountReducer.class);
-    conf.setReducerClass(TermCountReducer.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setCombinerClass(TermCountReducer.class);
+    job.setReducerClass(TermCountReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     
     HadoopUtil.overwriteOutput(output);
     
-    client.setConf(conf);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java Wed Jul  7 18:46:19 2010
@@ -20,15 +20,14 @@ package org.apache.mahout.utils.vectors.
 import java.io.IOException;
 import java.nio.charset.Charset;
 
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.StringTuple;
@@ -68,31 +67,31 @@ public final class DocumentProcessor {
    * @param analyzerClass
    *          The Lucene {@link Analyzer} for tokenizing the UTF-8 text
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void tokenizeDocuments(Path input, Class<? extends Analyzer> analyzerClass,
-                                       Path output) throws IOException {
-    
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(DocumentProcessor.class);
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+                                       Path output) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
     // this conf parameter needs to be set enable serialisation of conf values
-    
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+                                  + "org.apache.hadoop.io.serializer.WritableSerialization"); 
     conf.set(ANALYZER_CLASS, analyzerClass.getName());
-    conf.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
     
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(StringTuple.class);
-    FileInputFormat.setInputPaths(conf, input);
-    FileOutputFormat.setOutputPath(conf, output);
+    Job job = new Job(conf);
+    job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(StringTuple.class);
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
     
-    conf.setMapperClass(SequenceFileTokenizerMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setNumReduceTasks(0);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setMapperClass(SequenceFileTokenizerMapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     HadoopUtil.overwriteOutput(output);
 
-    client.setConf(conf);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java Wed Jul  7 18:46:19 2010
@@ -21,11 +21,7 @@ import java.io.IOException;
 import java.io.StringReader;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.tokenattributes.TermAttribute;
@@ -36,13 +32,15 @@ import org.apache.mahout.utils.vectors.t
 /**
  * Tokenizes a text document and outputs tokens in a StringTuple
  */
-public class SequenceFileTokenizerMapper extends MapReduceBase implements Mapper<Text,Text,Text,StringTuple> {
-  
+public class SequenceFileTokenizerMapper extends Mapper<Text, Text, Text, StringTuple> {
+
   private Analyzer analyzer;
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+   */
   @Override
-  public void map(Text key, Text value,
-                  OutputCollector<Text,StringTuple> output, Reporter reporter) throws IOException {
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
     TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(value.toString()));
     TermAttribute termAtt = stream.addAttribute(TermAttribute.class);
     StringTuple document = new StringTuple();
@@ -51,16 +49,19 @@ public class SequenceFileTokenizerMapper
         document.add(new String(termAtt.termBuffer(), 0, termAtt.termLength()));
       }
     }
-    output.collect(key, document);
+    context.write(key, document);
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
     try {
       ClassLoader ccl = Thread.currentThread().getContextClassLoader();
       Class<?> cl = ccl
-          .loadClass(job.get(DocumentProcessor.ANALYZER_CLASS, DefaultAnalyzer.class.getName()));
+          .loadClass(context.getConfiguration().get(DocumentProcessor.ANALYZER_CLASS, DefaultAnalyzer.class.getName()));
       analyzer = (Analyzer) cl.newInstance();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -70,5 +71,4 @@ public class SequenceFileTokenizerMapper
       throw new IllegalStateException(e);
     }
   }
-  
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java Wed Jul  7 18:46:19 2010
@@ -21,17 +21,14 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.lucene.analysis.shingle.ShingleFilter;
 import org.apache.lucene.analysis.tokenattributes.TermAttribute;
 import org.apache.mahout.common.StringTuple;
@@ -48,32 +45,32 @@ import org.apache.mahout.utils.vectors.t
 /**
  * Converts a document in to a sparse vector
  */
-public class TFPartialVectorReducer extends MapReduceBase implements
-    Reducer<Text,StringTuple,Text,VectorWritable> {
+public class TFPartialVectorReducer extends Reducer<Text, StringTuple, Text, VectorWritable> {
 
   private final OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<String>();
 
   private int dimension;
+
   private boolean sequentialAccess;
-  
+
   private int maxNGramSize = 1;
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(Text key,
-                     Iterator<StringTuple> values,
-                     OutputCollector<Text,VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    if (values.hasNext() == false) {
+  protected void reduce(Text key, Iterable<StringTuple> values, Context context) throws IOException, InterruptedException {
+    Iterator<StringTuple> it = values.iterator();
+    if (it.hasNext() == false) {
       return;
     }
-    StringTuple value = values.next();
-    
+    StringTuple value = it.next();
+
     Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size
-    
+
     if (maxNGramSize >= 2) {
-      ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()),
-          maxNGramSize);
-      
+      ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()), maxNGramSize);
+
       do {
         String term = ((TermAttribute) sf.getAttribute(TermAttribute.class)).term();
         if (term.length() > 0) { // ngram
@@ -83,7 +80,7 @@ public class TFPartialVectorReducer exte
           }
         }
       } while (sf.incrementToken());
-      
+
       sf.end();
       sf.close();
     } else {
@@ -102,29 +99,33 @@ public class TFPartialVectorReducer exte
     // if the vector has no nonZero entries (nothing in the dictionary), let's not waste space sending it to disk.
     if (vector.getNumNondefaultElements() > 0) {
       VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
-      output.collect(key, vectorWritable);
+      context.write(key, vectorWritable);
     } else {
-      reporter.incrCounter("TFParticalVectorReducer", "emptyVectorCount", 1);
+      context.getCounter("TFParticalVectorReducer", "emptyVectorCount").increment(1);
     }
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
     try {
-      dimension = job.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
-      sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
-      maxNGramSize = job.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
-      URI[] localFiles = DistributedCache.getCacheFiles(job);
+      dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
+      sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+      maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
+      URI[] localFiles = DistributedCache.getCacheFiles(conf);
       if (localFiles == null || localFiles.length < 1) {
         throw new IllegalArgumentException("missing paths from the DistributedCache");
       }
       Path dictionaryFile = new Path(localFiles[0].getPath());
-      FileSystem fs = dictionaryFile.getFileSystem(job);
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, job);
+      FileSystem fs = dictionaryFile.getFileSystem(conf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, conf);
       Text key = new Text();
       IntWritable value = new IntWritable();
-      
+
       // key is word value is id
       while (reader.next(key, value)) {
         dictionary.put(key.toString(), value.get());
@@ -133,5 +134,5 @@ public class TFPartialVectorReducer exte
       throw new IllegalStateException(e);
     }
   }
-  
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java Wed Jul  7 18:46:19 2010
@@ -21,10 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.math.function.ObjectLongProcedure;
 import org.apache.mahout.math.map.OpenObjectLongHashMap;
@@ -32,12 +29,13 @@ import org.apache.mahout.math.map.OpenOb
 /**
  * TextVectorizer Term Count Mapper. Tokenizes a text document and outputs the count of the words
  */
-public class TermCountMapper extends MapReduceBase implements Mapper<Text,StringTuple,Text,LongWritable> {
+public class TermCountMapper extends Mapper<Text, StringTuple, Text, LongWritable> {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+   */
   @Override
-  public void map(Text key,
-                  StringTuple value,
-                  final OutputCollector<Text,LongWritable> output,
-                  final Reporter reporter) throws IOException {
+  protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
     OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();
     for (String word : value.getEntries()) {
       if (wordCount.containsKey(word)) {
@@ -50,10 +48,12 @@ public class TermCountMapper extends Map
       @Override
       public boolean apply(String first, long second) {
         try {
-          output.collect(new Text(first), new LongWritable(second));
+          context.write(new Text(first), new LongWritable(second));
         } catch (IOException e) {
-          reporter.incrCounter("Exception", "Output IO Exception", 1);
-        }
+          context.getCounter("Exception", "Output IO Exception").increment(1);
+        } catch (InterruptedException e) {
+          context.getCounter("Exception", "Interrupted Exception").increment(1);
+       }
         return true;
       }
     });

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java Wed Jul  7 18:46:19 2010
@@ -22,38 +22,38 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.utils.vectors.text.DictionaryVectorizer;
 
 /**
  * Can also be used as a local Combiner. This accumulates all the words and the weights and sums them up.
  */
-public class TermCountReducer extends MapReduceBase implements Reducer<Text,LongWritable,Text,LongWritable> {
-  
+public class TermCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
+
   private int minSupport;
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(Text key,
-                     Iterator<LongWritable> values,
-                     OutputCollector<Text,LongWritable> output,
-                     Reporter reporter) throws IOException {
+  protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
     long sum = 0;
-    while (values.hasNext()) {
-      sum += values.next().get();
+    Iterator<LongWritable> it = values.iterator();
+    while (it.hasNext()) {
+      sum += it.next().get();
     }
     if (sum >= minSupport) {
-      output.collect(key, new LongWritable(sum));
+      context.write(key, new LongWritable(sum));
     }
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    minSupport = job.getInt(DictionaryVectorizer.MIN_SUPPORT,
-      DictionaryVectorizer.DEFAULT_MIN_SUPPORT);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    minSupport = context.getConfiguration().getInt(DictionaryVectorizer.MIN_SUPPORT, DictionaryVectorizer.DEFAULT_MIN_SUPPORT);
   }
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java Wed Jul  7 18:46:19 2010
@@ -23,36 +23,33 @@ import java.util.Iterator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.Vector.Element;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.Vector.Element;
 
 /**
  * TextVectorizer Document Frequency Count Mapper. Outputs 1 for each feature
  * 
  */
-public class TermDocumentCountMapper extends MapReduceBase implements
-    Mapper<WritableComparable<?>,VectorWritable,IntWritable,LongWritable> {
-  
+public class TermDocumentCountMapper extends Mapper<WritableComparable<?>, VectorWritable, IntWritable, LongWritable> {
+
   private static final LongWritable ONE = new LongWritable(1);
+
   private static final IntWritable TOTAL_COUNT = new IntWritable(-1);
-  
-  @Override
-  public void map(WritableComparable<?> key,
-                  VectorWritable value,
-                  OutputCollector<IntWritable,LongWritable> output,
-                  Reporter reporter) throws IOException {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+   */
+  protected void map(WritableComparable<?> key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
     Vector vector = value.get();
     Iterator<Element> it = vector.iterateNonZero();
-    
+
     while (it.hasNext()) {
       Element e = it.next();
-      output.collect(new IntWritable(e.index()), ONE);
+      context.write(new IntWritable(e.index()), ONE);
     }
-    output.collect(TOTAL_COUNT, ONE);
+    context.write(TOTAL_COUNT, ONE);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java Wed Jul  7 18:46:19 2010
@@ -22,26 +22,24 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 
 /**
  * Can also be used as a local Combiner. This accumulates all the features and the weights and sums them up.
  */
-public class TermDocumentCountReducer extends MapReduceBase implements
-    Reducer<IntWritable,LongWritable,IntWritable,LongWritable> {
-  
+public class TermDocumentCountReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(IntWritable key,
-                     Iterator<LongWritable> values,
-                     OutputCollector<IntWritable,LongWritable> output,
-                     Reporter reporter) throws IOException {
+  protected void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
     long sum = 0;
-    while (values.hasNext()) {
-      sum += values.next().get();
+    Iterator<LongWritable> it = values.iterator();
+    while (it.hasNext()) {
+      sum += it.next().get();
     }
-    output.collect(key, new LongWritable(sum));
+    context.write(key, new LongWritable(sum));
   }
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java Wed Jul  7 18:46:19 2010
@@ -22,7 +22,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -32,13 +31,12 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.math.VectorWritable;
@@ -55,40 +53,40 @@ import org.apache.mahout.utils.vectors.t
  * 
  */
 public final class TFIDFConverter {
-  
+
   public static final String VECTOR_COUNT = "vector.count";
-  
+
   public static final String FEATURE_COUNT = "feature.count";
-  
+
   public static final String MIN_DF = "min.df";
-  
+
   public static final String MAX_DF_PERCENTAGE = "max.df.percentage";
-  
+
   public static final String TFIDF_OUTPUT_FOLDER = "tfidf";
-  
+
   private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tfidf-vectors";
-  
+
   private static final String FREQUENCY_FILE = "frequency.file-";
-  
+
   private static final int MAX_CHUNKSIZE = 10000;
-  
+
   private static final int MIN_CHUNKSIZE = 100;
-  
+
   private static final String OUTPUT_FILES_PATTERN = "part-*";
-  
+
   private static final int SEQUENCEFILE_BYTE_OVERHEAD = 45;
-  
+
   private static final String VECTOR_OUTPUT_FOLDER = "partial-vectors-";
-  
+
   private static final String WORDCOUNT_OUTPUT_FOLDER = "df-count";
-  
+
   /**
    * Cannot be initialized. Use the static functions
    */
   private TFIDFConverter() {
 
   }
-  
+
   /**
    * Create Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors from the input set of vectors in
    * {@link SequenceFile} format. This job uses a fixed limit on the maximum memory used by the feature chunk
@@ -115,6 +113,8 @@ public final class TFIDFConverter {
    *          will typically produce a single output file containing tf-idf vectors for a subset of the
    *          documents in the corpus.
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void processTfIdf(Path input,
                                   Path output,
@@ -122,48 +122,57 @@ public final class TFIDFConverter {
                                   int minDf,
                                   int maxDFPercent,
                                   float normPower,
-                                  boolean sequentialAccessOutput, 
-                                  int numReducers) throws IOException {
+                                  boolean sequentialAccessOutput,
+                                  int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
     if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
       chunkSizeInMegabytes = MIN_CHUNKSIZE;
     } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
       chunkSizeInMegabytes = MAX_CHUNKSIZE;
     }
-    
+
     if (normPower != PartialVectorMerger.NO_NORMALIZING && normPower < 0) {
       throw new IllegalArgumentException("normPower must either be -1 or >= 0");
     }
-    
+
     if (minDf < 1) {
       minDf = 1;
     }
     if (maxDFPercent < 0 || maxDFPercent > 100) {
       maxDFPercent = 99;
     }
-    
+
     Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);
-    
+
     startDFCounting(input, wordCountPath);
-    Pair<Long[],List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output,
-      chunkSizeInMegabytes);
-    
+    Pair<Long[], List<Path>> datasetFeatures = createDictionaryChunks(wordCountPath, output, chunkSizeInMegabytes);
+
     int partialVectorIndex = 0;
     List<Path> partialVectorPaths = new ArrayList<Path>();
     List<Path> dictionaryChunks = datasetFeatures.getSecond();
     for (Path dictionaryChunk : dictionaryChunks) {
       Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
       partialVectorPaths.add(partialVectorOutputPath);
-      makePartialVectors(input, datasetFeatures.getFirst()[0], datasetFeatures.getFirst()[1],
-        minDf, maxDFPercent, dictionaryChunk, partialVectorOutputPath, sequentialAccessOutput);
+      makePartialVectors(input,
+                         datasetFeatures.getFirst()[0],
+                         datasetFeatures.getFirst()[1],
+                         minDf,
+                         maxDFPercent,
+                         dictionaryChunk,
+                         partialVectorOutputPath,
+                         sequentialAccessOutput);
     }
-    
+
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(partialVectorPaths.get(0).toUri(), conf);
-    
+
     Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
     if (dictionaryChunks.size() > 1) {
-      PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, normPower,
-        datasetFeatures.getFirst()[0].intValue(), sequentialAccessOutput, numReducers);
+      PartialVectorMerger.mergePartialVectors(partialVectorPaths,
+                                              outputDir,
+                                              normPower,
+                                              datasetFeatures.getFirst()[0].intValue(),
+                                              sequentialAccessOutput,
+                                              numReducers);
       HadoopUtil.deletePaths(partialVectorPaths, fs);
     } else {
       Path singlePartialVectorOutputPath = partialVectorPaths.get(0);
@@ -171,7 +180,7 @@ public final class TFIDFConverter {
       fs.rename(singlePartialVectorOutputPath, outputDir);
     }
   }
-  
+
   /**
    * Read the document frequency List which is built at the end of the DF Count Job. This will use constant
    * memory and will run at the speed of your disk read
@@ -180,25 +189,24 @@ public final class TFIDFConverter {
    * @param dictionaryPathBase
    * @throws IOException
    */
-  private static Pair<Long[],List<Path>> createDictionaryChunks(Path featureCountPath,
-                                                                Path dictionaryPathBase,
-                                                                int chunkSizeInMegabytes) throws IOException {
+  private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
+                                                                 Path dictionaryPathBase,
+                                                                 int chunkSizeInMegabytes) throws IOException {
     List<Path> chunkPaths = new ArrayList<Path>();
-    
+
     IntWritable key = new IntWritable();
     LongWritable value = new LongWritable();
     Configuration conf = new Configuration();
-    
+
     FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
     FileStatus[] outputFiles = fs.globStatus(new Path(featureCountPath, OUTPUT_FILES_PATTERN));
-    
+
     long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
     int chunkIndex = 0;
     Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
     chunkPaths.add(chunkPath);
-    SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class,
-        LongWritable.class);
-    
+    SequenceFile.Writer freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
+
     long currentChunkSize = 0;
     long featureCount = 0;
     long vectorCount = Long.MAX_VALUE;
@@ -210,14 +218,14 @@ public final class TFIDFConverter {
         if (currentChunkSize > chunkSizeLimit) {
           freqWriter.close();
           chunkIndex++;
-          
+
           chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
           chunkPaths.add(chunkPath);
-          
+
           freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
           currentChunkSize = 0;
         }
-        
+
         int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;
         currentChunkSize += fieldSize;
         if (key.get() >= 0) {
@@ -226,15 +234,15 @@ public final class TFIDFConverter {
           vectorCount = value.get();
         }
         featureCount = Math.max(key.get(), featureCount);
-        
+
       }
     }
     featureCount++;
     freqWriter.close();
-    Long[] counts = {featureCount, vectorCount};
-    return new Pair<Long[],List<Path>>(counts, chunkPaths);
+    Long[] counts = { featureCount, vectorCount };
+    return new Pair<Long[], List<Path>>(counts, chunkPaths);
   }
-  
+
   /**
    * Create a partial tfidf vector using a chunk of features from the input vectors. The input vectors has to
    * be in the {@link SequenceFile} format
@@ -255,6 +263,8 @@ public final class TFIDFConverter {
    * @param output
    *          output directory were the partial vectors have to be created
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   private static void makePartialVectors(Path input,
                                          Long featureCount,
@@ -263,68 +273,67 @@ public final class TFIDFConverter {
                                          int maxDFPercent,
                                          Path dictionaryFilePath,
                                          Path output,
-                                         boolean sequentialAccess) throws IOException {
-    
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(TFIDFConverter.class);
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+                                         boolean sequentialAccess) throws IOException, InterruptedException, ClassNotFoundException {
+
+    Configuration conf = new Configuration();
     // this conf parameter needs to be set enable serialisation of conf values
-    
-    conf.setJobName(": MakePartialVectors: input-folder: " + input + ", dictionary-file: "
-                    + dictionaryFilePath.toString());
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
     conf.setLong(FEATURE_COUNT, featureCount);
     conf.setLong(VECTOR_COUNT, vectorCount);
     conf.setInt(MIN_DF, minDf);
     conf.setInt(MAX_DF_PERCENTAGE, maxDFPercent);
     conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(VectorWritable.class);
-    DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
-    FileInputFormat.setInputPaths(conf, input);
-    
-    FileOutputFormat.setOutputPath(conf, output);
-    
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setReducerClass(TFIDFPartialVectorReducer.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    DistributedCache.setCacheFiles(new URI[] { dictionaryFilePath.toUri() }, conf);
+
+    Job job = new Job(conf);
+    job.setJobName(": MakePartialVectors: input-folder: " + input + ", dictionary-file: " + dictionaryFilePath.toString());
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VectorWritable.class);
+    FileInputFormat.setInputPaths(job, input);
+
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setMapperClass(Mapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setReducerClass(TFIDFPartialVectorReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
 
     HadoopUtil.overwriteOutput(output);
-    
-    client.setConf(conf);
-    JobClient.runJob(conf);
+
+    job.waitForCompletion(true);
   }
-  
+
   /**
    * Count the document frequencies of features in parallel using Map/Reduce. The input documents have to be
    * in {@link SequenceFile} format
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
-  private static void startDFCounting(Path input, Path output) throws IOException {
-    
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(TFIDFConverter.class);
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+  private static void startDFCounting(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+    Configuration conf = new Configuration();
     // this conf parameter needs to be set enable serialisation of conf values
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
     
-    conf.setJobName("VectorTfIdf Document Frequency Count running over input: " + input.toString());
-    conf.setOutputKeyClass(IntWritable.class);
-    conf.setOutputValueClass(LongWritable.class);
-    
-    FileInputFormat.setInputPaths(conf, input);
-    FileOutputFormat.setOutputPath(conf, output);
-    
-    conf.setMapperClass(TermDocumentCountMapper.class);
-    
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setCombinerClass(TermDocumentCountReducer.class);
-    conf.setReducerClass(TermDocumentCountReducer.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    
+    Job job = new Job(conf);
+    job.setJobName("VectorTfIdf Document Frequency Count running over input: " + input.toString());
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setMapperClass(TermDocumentCountMapper.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setCombinerClass(TermDocumentCountReducer.class);
+    job.setReducerClass(TermDocumentCountReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
     HadoopUtil.overwriteOutput(output);
 
-    client.setConf(conf);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java Wed Jul  7 18:46:19 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,11 +29,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -46,30 +43,38 @@ import org.apache.mahout.utils.vectors.c
 /**
  * Converts a document in to a sparse vector
  */
-public class TFIDFPartialVectorReducer extends MapReduceBase implements
-    Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
-  
+public class TFIDFPartialVectorReducer extends
+    Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
   private final OpenIntLongHashMap dictionary = new OpenIntLongHashMap();
+
   private final TFIDF tfidf = new TFIDF();
+
   private int minDf = 1;
+
   private int maxDfPercent = 99;
+
   private long vectorCount = 1;
+
   private long featureCount;
+
   private boolean sequentialAccess;
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(WritableComparable<?> key,
-                     Iterator<VectorWritable> values,
-                     OutputCollector<WritableComparable<?>,VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    if (!values.hasNext()) {
+  protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context) throws IOException,
+      InterruptedException {
+    Iterator<VectorWritable> it = values.iterator();
+    if (!it.hasNext()) {
       return;
     }
-    Vector value = values.next().get();
-    Iterator<Element> it = value.iterateNonZero();
+    Vector value = it.next().get();
+    Iterator<Element> it1 = value.iterateNonZero();
     Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
-    while (it.hasNext()) {
-      Element e = it.next();
+    while (it1.hasNext()) {
+      Element e = it1.next();
       if (!dictionary.containsKey(e.index())) {
         continue;
       }
@@ -80,38 +85,40 @@ public class TFIDFPartialVectorReducer e
       if (df < minDf) {
         df = minDf;
       }
-      vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount,
-        (int) vectorCount));
+      vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
     }
     if (sequentialAccess) {
       vector = new SequentialAccessSparseVector(vector);
     }
     VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
-    output.collect(key, vectorWritable);
+    context.write(key, vectorWritable);
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
     try {
-      
-      URI[] localFiles = DistributedCache.getCacheFiles(job);
+      Configuration conf = context.getConfiguration();
+      URI[] localFiles = DistributedCache.getCacheFiles(conf);
       if (localFiles == null || localFiles.length < 1) {
         throw new IllegalArgumentException("missing paths from the DistributedCache");
       }
-      
-      vectorCount = job.getLong(TFIDFConverter.VECTOR_COUNT, 1);
-      featureCount = job.getLong(TFIDFConverter.FEATURE_COUNT, 1);
-      minDf = job.getInt(TFIDFConverter.MIN_DF, 1);
-      maxDfPercent = job.getInt(TFIDFConverter.MAX_DF_PERCENTAGE, 99);
-      sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
-      
+
+      vectorCount = conf.getLong(TFIDFConverter.VECTOR_COUNT, 1);
+      featureCount = conf.getLong(TFIDFConverter.FEATURE_COUNT, 1);
+      minDf = conf.getInt(TFIDFConverter.MIN_DF, 1);
+      maxDfPercent = conf.getInt(TFIDFConverter.MAX_DF_PERCENTAGE, 99);
+      sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+
       Path dictionaryFile = new Path(localFiles[0].getPath());
-      FileSystem fs = dictionaryFile.getFileSystem(job);
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, job);
+      FileSystem fs = dictionaryFile.getFileSystem(conf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, dictionaryFile, conf);
       IntWritable key = new IntWritable();
       LongWritable value = new LongWritable();
-      
+
       // key is feature, value is the document frequency
       while (reader.next(key, value)) {
         dictionary.put(key.get(), value.get());
@@ -120,4 +127,5 @@ public class TFIDFPartialVectorReducer e
       throw new IllegalStateException(e);
     }
   }
+
 }

Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java Wed Jul  7 18:46:19 2010
@@ -17,39 +17,32 @@
 
 package org.apache.mahout.utils.nlp.collocations.llr;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.Reader;
 import java.util.Collections;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.util.Version;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.DummyRecordWriter;
 import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.utils.nlp.collocations.llr.Gram.Type;
-import org.easymock.EasyMock;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Test for CollocMapper 
  */
-@SuppressWarnings("deprecation")
 public class CollocMapperTest {
   
   private OutputCollector<GramKey,Gram> collector;
-  private Reporter reporter;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() {
-    collector = EasyMock.createMock(OutputCollector.class);
-    reporter = EasyMock.createMock(Reporter.class);
-  }
-  
   @Test
   public void testCollectNgrams() throws Exception {
     
@@ -88,22 +81,23 @@ public class CollocMapperTest {
       
       GramKey subgramKey = new GramKey(subgram, new byte[0]);
       GramKey subgramNgramKey = new GramKey(subgram, ngram.getBytes());
+      collector = new DummyOutputCollector<GramKey, Gram>();
       collector.collect(subgramKey, subgram);
       collector.collect(subgramNgramKey, ngram);
     }
     
-    reporter.incrCounter(CollocMapper.Count.NGRAM_TOTAL, 7);
-    EasyMock.replay(reporter, collector);
-    
-    JobConf conf = new JobConf();
+    Configuration conf = new Configuration();
     conf.set(CollocMapper.MAX_SHINGLE_SIZE, "2");
     
-    CollocMapper c = new CollocMapper();
-    c.configure(conf);
+    CollocMapper mapper = new CollocMapper();
+    DummyRecordWriter<GramKey, Gram> writer = new DummyRecordWriter<GramKey, Gram>();
+    Mapper<Text, StringTuple, GramKey, Gram>.Context context =  DummyRecordWriter.build(mapper, conf, writer);
+    mapper.setup(context);
     
-    c.map(key, inputTuple, collector, reporter);
+    mapper.map(key, inputTuple, context);
     
-    EasyMock.verify(reporter, collector);
+    Counter counter = (Counter) context.getCounter(CollocMapper.Count.NGRAM_TOTAL);
+    assertEquals("counter", 7, counter.getValue());
   }
   
   @Test
@@ -145,7 +139,7 @@ public class CollocMapperTest {
         frequency = 2;
       }
       
-      
+      collector = new DummyOutputCollector<GramKey, Gram>();
      
       if (p == Gram.Type.UNIGRAM) {
         Gram unigram = new Gram(v[1], frequency, Gram.Type.UNIGRAM);
@@ -163,19 +157,19 @@ public class CollocMapperTest {
       }
     }
     
-    reporter.incrCounter(CollocMapper.Count.NGRAM_TOTAL, 7);
-    EasyMock.replay(reporter, collector);
-    
-    JobConf conf = new JobConf();
+    Configuration conf = new Configuration();
     conf.set(CollocMapper.MAX_SHINGLE_SIZE, "2");
     conf.setBoolean(CollocDriver.EMIT_UNIGRAMS, true);
     
-    CollocMapper c = new CollocMapper();
-    c.configure(conf);
+    CollocMapper mapper = new CollocMapper();
+    DummyRecordWriter<GramKey, Gram> writer = new DummyRecordWriter<GramKey, Gram>();
+    Mapper<Text, StringTuple, GramKey, Gram>.Context context = DummyRecordWriter.build(mapper, conf, writer);
+    mapper.setup(context);
     
-    c.map(key, inputTuple, collector, reporter);
+    mapper.map(key, inputTuple, context);
     
-    EasyMock.verify(reporter, collector);
+    Counter counter = (Counter) context.getCounter(CollocMapper.Count.NGRAM_TOTAL);
+    assertEquals("counter", 7, counter.getValue());
   }
   
   /** A lucene 2.9 standard analyzer with no stopwords. */

Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java Wed Jul  7 18:46:19 2010
@@ -18,74 +18,74 @@
 package org.apache.mahout.utils.nlp.collocations.llr;
 
 import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.HEAD;
+import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
 import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
 import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.UNIGRAM;
-import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.easymock.EasyMock;
-import org.junit.Before;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
 import org.junit.Test;
 
 /**
  * Test the CollocReducer
  */
 public class CollocReducerTest {
-  
-  private OutputCollector<Gram,Gram> output;
-  private Reporter reporter;
-  
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() {
-    output = EasyMock.createMock(OutputCollector.class);
-    reporter = EasyMock.createMock(Reporter.class);
-  }
-  
+
   @Test
   public void testReduce() throws Exception {
     // test input, input[*][0] is the key,
     // input[*][1..n] are the values passed in via
     // the iterator.
-    Gram[][] input = {
-        {new Gram("the", UNIGRAM), new Gram("the", UNIGRAM), new Gram("the", UNIGRAM)},
-        {new Gram("the", HEAD), new Gram("the best", NGRAM), new Gram("the worst", NGRAM)},
-        {new Gram("of", HEAD), new Gram("of times", NGRAM), new Gram("of times", NGRAM)},
-        {new Gram("times", TAIL), new Gram("of times", NGRAM), new Gram("of times", NGRAM)}};
-    
+    Gram[][] input = { { new Gram("the", UNIGRAM), new Gram("the", UNIGRAM), new Gram("the", UNIGRAM) },
+        { new Gram("the", HEAD), new Gram("the best", NGRAM), new Gram("the worst", NGRAM) },
+        { new Gram("of", HEAD), new Gram("of times", NGRAM), new Gram("of times", NGRAM) },
+        { new Gram("times", TAIL), new Gram("of times", NGRAM), new Gram("of times", NGRAM) } };
+
     // expected results.
-    Gram[][] values = {{new Gram("the", 2, UNIGRAM), new Gram("the", 2, UNIGRAM)},
-                                    {new Gram("the best", 1, NGRAM), new Gram("the", 2, HEAD)},
-                                    {new Gram("the worst", 1, NGRAM), new Gram("the", 2, HEAD)},
-                                    {new Gram("of times", 2, NGRAM), new Gram("of", 2, HEAD)},
-                                    {new Gram("of times", 2, NGRAM), new Gram("times", 2, TAIL)}};
-
-    // set up expectations
-    for (Gram[] v : values) {
-      output.collect(v[0], v[1]);
+    Gram[][] values = { { new Gram("the", 2, UNIGRAM), new Gram("the", 2, UNIGRAM) },
+        { new Gram("the best", 1, NGRAM), new Gram("the", 2, HEAD) },
+        { new Gram("the worst", 1, NGRAM), new Gram("the", 2, HEAD) }, 
+        { new Gram("of times", 2, NGRAM), new Gram("of", 2, HEAD) },
+        { new Gram("of times", 2, NGRAM), new Gram("times", 2, TAIL) } };
+
+    Map<Gram, List<Gram>> reference = new HashMap<Gram, List<Gram>>();
+    for (Gram[] grams : values) {
+      List<Gram> list = reference.get(grams[0]);
+      if (list == null) {
+        list = new ArrayList<Gram>();
+        reference.put(grams[0], list);
+      }
+      for (int j = 1; j < grams.length; j++)
+        list.add(grams[j]);
     }
-    EasyMock.replay(reporter, output);
-    
-    // play back the input data.
-    CollocReducer c = new CollocReducer();
-    
+
+    // reduce the input data.
+    Configuration conf = new Configuration();
+    CollocReducer reducer = new CollocReducer();
+    DummyRecordWriter<Gram, Gram> writer = new DummyRecordWriter<Gram, Gram>();
+    Reducer<GramKey, Gram, Gram, Gram>.Context context = DummyRecordWriter.build(reducer, conf, writer, GramKey.class, Gram.class);
+
     GramKey key = new GramKey();
 
     byte[] empty = new byte[0];
     for (Gram[] ii : input) {
       key.set(ii[0], empty);
-
       List<Gram> vv = new LinkedList<Gram>();
       vv.addAll(Arrays.asList(ii));
-      c.reduce(key, vv.iterator(), output, reporter);
+      reducer.reduce(key, vv, context);
     }
-    
-    EasyMock.verify(reporter, output);
+    assertTrue(writer.getKeys().size() == reference.keySet().size());
+    for (Gram gram : reference.keySet())
+      assertEquals("Gram " + gram, reference.get(gram).size(), writer.getValue(gram).size());
   }
-  
 }

Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java Wed Jul  7 18:46:19 2010
@@ -18,18 +18,18 @@
 package org.apache.mahout.utils.nlp.collocations.llr;
 
 import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.HEAD;
-import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
 import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.NGRAM;
+import static org.apache.mahout.utils.nlp.collocations.llr.Gram.Type.TAIL;
 
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
 import org.apache.mahout.math.stats.LogLikelihood;
 import org.apache.mahout.utils.nlp.collocations.llr.LLRReducer.LLCallback;
 import org.easymock.EasyMock;
@@ -41,29 +41,16 @@ import org.slf4j.LoggerFactory;
 /** Test the LLRReducer
  *  FIXME: Add negative test cases.
  */
-@SuppressWarnings("deprecation")
 public class LLRReducerTest {
   
   private static final Logger log =
     LoggerFactory.getLogger(LLRReducerTest.class);
   
-  private Reporter reporter;
   private LLCallback ll;
   private LLCallback cl;
 
-  // not verifying the llr algo output here, just the input, but it is handy
-  // to see the values emitted.
-  private final OutputCollector<Text,DoubleWritable> collector = new OutputCollector<Text,DoubleWritable>() {
-    @Override
-    public void collect(Text key, DoubleWritable value) {
-      log.info("{} {}", key, value);
-    }
-  };
-  
-  
   @Before
   public void setUp() {
-    reporter  = EasyMock.createMock(Reporter.class);
     ll        = EasyMock.createMock(LLCallback.class);
     cl        = new LLCallback() {
       @Override
@@ -76,7 +63,6 @@ public class LLRReducerTest {
   
   @Test
   public void testReduce() throws Exception {
-    LLRReducer reducer = new LLRReducer(ll);
     
     // test input, input[*][0] is the key,
     // input[*][1..n] are the values passed in via
@@ -108,14 +94,21 @@ public class LLRReducerTest {
     
     EasyMock.replay(ll);
     
-    JobConf config = new JobConf(CollocDriver.class);
-    config.set(LLRReducer.NGRAM_TOTAL, "7");
-    reducer.configure(config);
+    Configuration conf = new Configuration();
+    conf.set(LLRReducer.NGRAM_TOTAL, "7");
+    LLRReducer reducer = new LLRReducer(ll);
+    DummyRecordWriter<Text, DoubleWritable> writer = new DummyRecordWriter<Text, DoubleWritable>();
+    Reducer<Gram, Gram, Text, DoubleWritable>.Context context = DummyRecordWriter.build(reducer,
+                                                                                          conf,
+                                                                                          writer,
+                                                                                          Gram.class,
+                                                                                          Gram.class);
+    reducer.setup(context);
     
     for (Gram[] ii: input) {
       List<Gram> vv = new LinkedList<Gram>();
       vv.addAll(Arrays.asList(ii).subList(1, ii.length));
-      reducer.reduce(ii[0], vv.iterator(), collector, reporter);
+      reducer.reduce(ii[0], vv, context);
     }
     
     EasyMock.verify(ll);

Modified: mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java (original)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java Wed Jul  7 18:46:19 2010
@@ -18,7 +18,6 @@
 package org.apache.mahout.utils.vectors.text;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -36,44 +35,40 @@ import org.apache.mahout.utils.vectors.t
  * Test the dictionary Vector
  */
 public class DictionaryVectorizerTest extends MahoutTestCase {
-  
+
   public static final int AVG_DOCUMENT_LENGTH = 20;
-  
+
   public static final int AVG_SENTENCE_LENGTH = 8;
-  
+
   public static final int AVG_WORD_LENGTH = 6;
-  
+
   public static final int NUM_DOCS = 100;
-  
+
   public static final String CHARSET = "abcdef";
-  
+
   public static final String DELIM = " .,?;:!\t\n\r";
-  
-  public static final String ERRORSET = "`1234567890"
-    + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
-  
+
+  public static final String ERRORSET = "`1234567890" + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
+
   private static final Random random = RandomUtils.getRandom();
-  
+
   private FileSystem fs;
-  
+
   private static char getRandomDelimiter() {
     return DELIM.charAt(random.nextInt(DictionaryVectorizerTest.DELIM.length()));
   }
-  
+
   public static String getRandomDocument() {
-    int length = (AVG_DOCUMENT_LENGTH >> 1)
-    + DictionaryVectorizerTest.random.nextInt(AVG_DOCUMENT_LENGTH);
-    StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH
-      * AVG_WORD_LENGTH);
+    int length = (AVG_DOCUMENT_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_DOCUMENT_LENGTH);
+    StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH * AVG_WORD_LENGTH);
     for (int i = 0; i < length; i++) {
       sb.append(getRandomSentence());
     }
     return sb.toString();
   }
-  
+
   public static String getRandomSentence() {
-    int length = (AVG_SENTENCE_LENGTH >> 1)
-    + DictionaryVectorizerTest.random.nextInt(AVG_SENTENCE_LENGTH);
+    int length = (AVG_SENTENCE_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_SENTENCE_LENGTH);
     StringBuilder sb = new StringBuilder(length * AVG_WORD_LENGTH);
     for (int i = 0; i < length; i++) {
       sb.append(getRandomString()).append(' ');
@@ -81,45 +76,56 @@ public class DictionaryVectorizerTest ex
     sb.append(getRandomDelimiter());
     return sb.toString();
   }
-  
+
   public static String getRandomString() {
     int length = (AVG_WORD_LENGTH >> 1) + DictionaryVectorizerTest.random.nextInt(AVG_WORD_LENGTH);
     StringBuilder sb = new StringBuilder(length);
     for (int i = 0; i < length; i++) {
-      sb.append(DictionaryVectorizerTest.CHARSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.CHARSET.length())));
+      sb.append(DictionaryVectorizerTest.CHARSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.CHARSET
+          .length())));
     }
     if (DictionaryVectorizerTest.random.nextInt(10) == 0) {
-      sb.append(DictionaryVectorizerTest.ERRORSET.charAt(DictionaryVectorizerTest.random
-        .nextInt(DictionaryVectorizerTest.ERRORSET.length())));
+      sb.append(DictionaryVectorizerTest.ERRORSET.charAt(DictionaryVectorizerTest.random.nextInt(DictionaryVectorizerTest.ERRORSET
+          .length())));
     }
     return sb.toString();
   }
-  
+
   @Override
   public void setUp() throws Exception {
     super.setUp();
     Configuration conf = new Configuration();
     fs = FileSystem.get(conf);
   }
-  
-  public void testCreateTermFrequencyVectors() throws IOException {
+
+  public void testCreateTermFrequencyVectors() throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     Path path = getTestTempFilePath("documents/docs.file");
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
-      Text.class, Text.class);
-    
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Text.class);
+
     for (int i = 0; i < NUM_DOCS; i++) {
-      writer.append(new Text("Document::ID::" + i), new Text(
-        getRandomDocument()));
+      writer.append(new Text("Document::ID::" + i), new Text(getRandomDocument()));
     }
     writer.close();
     Class<? extends Analyzer> analyzer = DefaultAnalyzer.class;
-    DocumentProcessor.tokenizeDocuments(path, analyzer,
-    getTestTempDirPath("output/tokenized-documents"));
+    DocumentProcessor.tokenizeDocuments(path, analyzer, getTestTempDirPath("output/tokenized-documents"));
     DictionaryVectorizer.createTermFrequencyVectors(getTestTempDirPath("output/tokenized-documents"),
-      getTestTempDirPath("output/wordcount"), conf, 2, 1, 0.0f, 1, 100, false);
-    TFIDFConverter.processTfIdf(getTestTempDirPath("output/wordcount/vectors"),
-                                getTestTempDirPath("output/tfidf"), 100, 1, 99, 1.0f, false, 1);
-    
+                                                    getTestTempDirPath("output/wordcount"),
+                                                    conf,
+                                                    2,
+                                                    1,
+                                                    0.0f,
+                                                    1,
+                                                    100,
+                                                    false);
+    TFIDFConverter.processTfIdf(getTestTempDirPath("output/wordcount/tf-vectors"),
+                                getTestTempDirPath("output/tfidf"),
+                                100,
+                                1,
+                                99,
+                                1.0f,
+                                false,
+                                1);
+
   }
 }