You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/07/07 20:46:20 UTC

svn commit: r961476 [1/2] - in /mahout/trunk: core/src/test/java/org/apache/mahout/common/ utils/src/main/java/org/apache/mahout/clustering/cdbw/ utils/src/main/java/org/apache/mahout/utils/ utils/src/main/java/org/apache/mahout/utils/clustering/ utils...

Author: jeastman
Date: Wed Jul  7 18:46:19 2010
New Revision: 961476

URL: http://svn.apache.org/viewvc?rev=961476&view=rev
Log:
MAHOUT-167: ported utils/ to Hadoop 0.20.2. All tests pass but are not exhaustive

Added:
    mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java
Modified:
    mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java

Added: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java?rev=961476&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java Wed Jul  7 18:46:19 2010
@@ -0,0 +1,11 @@
+package org.apache.mahout.common;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+class DummyCounter extends Counter {
+
+  public DummyCounter() {
+    super();
+  }
+
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java Wed Jul  7 18:46:19 2010
@@ -1,19 +1,29 @@
 package org.apache.mahout.common;
 
-import org.apache.commons.lang.NotImplementedException;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 
 public class DummyStatusReporter extends StatusReporter {
 
+  Map<Enum<?>, Counter> counters = new HashMap<Enum<?>, Counter>();
+
   @Override
   public Counter getCounter(Enum<?> name) {
-    throw new NotImplementedException();
+    if (!counters.containsKey(name))
+      counters.put(name, new DummyCounter());
+    return counters.get(name);
   }
 
+  Map<String, Counter> counterGroups = new HashMap<String, Counter>();
+
   @Override
   public Counter getCounter(String group, String name) {
-    throw new NotImplementedException();
+    if (!counterGroups.containsKey(group + name))
+      counterGroups.put(group + name, new DummyCounter());
+    return counterGroups.get(name);
   }
 
   @Override

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java Wed Jul  7 18:46:19 2010
@@ -18,7 +18,6 @@
 package org.apache.mahout.clustering.cdbw;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 
 import org.apache.commons.cli2.CommandLine;
 import org.apache.commons.cli2.Group;
@@ -28,23 +27,21 @@ import org.apache.commons.cli2.builder.A
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.clustering.dirichlet.DirichletCluster;
-import org.apache.mahout.clustering.kmeans.KMeansDriver;
 import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.math.VectorWritable;
@@ -74,17 +71,15 @@ public final class CDbwDriver {
     Option maxIterOpt = DefaultOptionCreator.maxIterationsOption().create();
     Option helpOpt = DefaultOptionCreator.helpOption();
 
-    Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(
-        abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
-        "The ModelDistribution class name. "
+    Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(abuilder
+        .withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription("The ModelDistribution class name. "
         + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution").create();
 
-    Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(
-        abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create())
-        .withDescription("The number of reduce tasks.").create();
+    Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(abuilder
+        .withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks.").create();
 
-    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt)
-        .withOption(modelOpt).withOption(maxIterOpt).withOption(helpOpt).withOption(numRedOpt).create();
+    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt).withOption(modelOpt)
+        .withOption(maxIterOpt).withOption(helpOpt).withOption(numRedOpt).create();
 
     try {
       Parser parser = new Parser();
@@ -125,16 +120,15 @@ public final class CDbwDriver {
    *          the number of iterations
    * @param numReducers
    *          the number of Reducers desired
+   * @throws InterruptedException 
    */
   public static void runJob(Path clustersIn,
                             Path clusteredPointsIn,
                             Path output,
                             String distanceMeasureClass,
                             int numIterations,
-                            int numReducers) throws ClassNotFoundException,
-                                                    InstantiationException,
-                                                    IllegalAccessException,
-                                                    IOException {
+                            int numReducers) throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+      IOException, InterruptedException {
 
     Path stateIn = new Path(output, "representativePoints-0");
     writeInitialState(stateIn, clustersIn);
@@ -148,30 +142,29 @@ public final class CDbwDriver {
       stateIn = stateOut;
     }
 
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(CDbwDriver.class);
+    Configuration conf = new Configuration();
     conf.set(STATE_IN_KEY, stateIn.toString());
     conf.set(DISTANCE_MEASURE_KEY, distanceMeasureClass);
     CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
-    //System.out.println("CDbw = " + evaluator.CDbw());
+    // now print out the CDbw
+    System.out.println("CDbw = " + evaluator.CDbw());
   }
 
-  private static void writeInitialState(Path output, Path clustersIn)
-      throws InstantiationException, IllegalAccessException, IOException, SecurityException {
-
-    JobConf job = new JobConf(KMeansDriver.class);
-    FileSystem fs = FileSystem.get(output.toUri(), job);
+  private static void writeInitialState(Path output, Path clustersIn) throws InstantiationException, IllegalAccessException,
+      IOException, SecurityException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
     for (FileStatus part : fs.listStatus(clustersIn)) {
       if (!part.getPath().getName().startsWith(".")) {
         Path inPart = part.getPath();
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, job);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, conf);
         Writable key = (Writable) reader.getKeyClass().newInstance();
         Writable value = (Writable) reader.getValueClass().newInstance();
         Path path = new Path(output, inPart.getName());
-        SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, IntWritable.class, VectorWritable.class);
+        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
         while (reader.next(key, value)) {
           Cluster cluster = (Cluster) value;
-          if (!(cluster instanceof DirichletCluster) || ((DirichletCluster) cluster).getTotalCount() > 0) {
+          if (!(cluster instanceof DirichletCluster<?>) || ((DirichletCluster<?>) cluster).getTotalCount() > 0) {
             //System.out.println("C-" + cluster.getId() + ": " + ClusterBase.formatVector(cluster.getCenter(), null));
             writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter()));
           }
@@ -194,33 +187,29 @@ public final class CDbwDriver {
    *          the class name of the DistanceMeasure class
    * @param numReducers
    *          the number of Reducers desired
+   * @throws IOException 
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
-  public static void runIteration(Path input, Path stateIn, Path stateOut,
-                                  String distanceMeasureClass, int numReducers) {
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(CDbwDriver.class);
-
-    conf.setOutputKeyClass(IntWritable.class);
-    conf.setOutputValueClass(VectorWritable.class);
-    conf.setMapOutputKeyClass(IntWritable.class);
-    conf.setMapOutputValueClass(WeightedVectorWritable.class);
-
-    FileInputFormat.setInputPaths(conf, input);
-    FileOutputFormat.setOutputPath(conf, stateOut);
-
-    conf.setMapperClass(CDbwMapper.class);
-    conf.setReducerClass(CDbwReducer.class);
-    conf.setNumReduceTasks(numReducers);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
+  public static void runIteration(Path input, Path stateIn, Path stateOut, String distanceMeasureClass, int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
     conf.set(STATE_IN_KEY, stateIn.toString());
     conf.set(DISTANCE_MEASURE_KEY, distanceMeasureClass);
+    Job job = new Job(conf);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(WeightedVectorWritable.class);
+
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, stateOut);
+
+    job.setMapperClass(CDbwMapper.class);
+    job.setReducerClass(CDbwReducer.class);
+    job.setNumReduceTasks(numReducers);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
 
-    client.setConf(conf);
-    try {
-      JobClient.runJob(conf);
-    } catch (IOException e) {
-      log.warn(e.toString(), e);
-    }
+    job.waitForCompletion(true);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java Wed Jul  7 18:46:19 2010
@@ -22,12 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.math.Vector;
@@ -68,18 +68,18 @@ public class CDbwEvaluator {
   /**
    * Initialize a new instance from job information
    * 
-   * @param job
+   * @param conf
    *            a JobConf with appropriate parameters
    * @param clustersIn
    *            a String path to the input clusters directory
    */
-  public CDbwEvaluator(JobConf job, Path clustersIn)
+  public CDbwEvaluator(Configuration conf, Path clustersIn)
       throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    Class<?> cl = ccl.loadClass(job.get(CDbwDriver.DISTANCE_MEASURE_KEY));
+    Class<?> cl = ccl.loadClass(conf.get(CDbwDriver.DISTANCE_MEASURE_KEY));
     measure = (DistanceMeasure) cl.newInstance();
-    representativePoints = CDbwMapper.getRepresentativePoints(job);
-    clusters = loadClusters(job, clustersIn);
+    representativePoints = CDbwMapper.getRepresentativePoints(conf);
+    clusters = loadClusters(conf, clustersIn);
     for (Integer cId : representativePoints.keySet()) {
       setStDev(cId);
     }
@@ -96,14 +96,14 @@ public class CDbwEvaluator {
    *            a String pathname to the directory containing input cluster files
    * @return a List<Cluster> of the clusters
    */
-  private static Map<Integer, Cluster> loadClusters(JobConf job, Path clustersIn)
+  private static Map<Integer, Cluster> loadClusters(Configuration conf, Path clustersIn)
       throws InstantiationException, IllegalAccessException, IOException {
     Map<Integer, Cluster> clusters = new HashMap<Integer, Cluster>();
-    FileSystem fs = clustersIn.getFileSystem(job);
+    FileSystem fs = clustersIn.getFileSystem(conf);
     for (FileStatus part : fs.listStatus(clustersIn)) {
       if (!part.getPath().getName().startsWith(".")) {
         Path inPart = part.getPath();
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, job);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, conf);
         Writable key = (Writable) reader.getKeyClass().newInstance();
         Writable value = (Writable) reader.getValueClass().newInstance();
         while (reader.next(key, value)) {

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java Wed Jul  7 18:46:19 2010
@@ -23,24 +23,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputLogFilter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
 import org.apache.mahout.math.VectorWritable;
 
-public class CDbwMapper extends MapReduceBase implements
-    Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
+public class CDbwMapper extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
 
   private Map<Integer, List<VectorWritable>> representativePoints;
 
@@ -48,16 +44,22 @@ public class CDbwMapper extends MapReduc
 
   private DistanceMeasure measure = new EuclideanDistanceMeasure();
 
-  private OutputCollector<IntWritable, WeightedVectorWritable> output;
-
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
+   */
   @Override
-  public void map(IntWritable clusterId,
-                  WeightedVectorWritable point,
-                  OutputCollector<IntWritable, WeightedVectorWritable> output,
-                  Reporter reporter) throws IOException {
-
-    this.output = output;
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    for (Integer clusterId : mostDistantPoints.keySet()) {
+      context.write(new IntWritable(clusterId), mostDistantPoints.get(clusterId));
+    }
+    super.cleanup(context);
+  }
 
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+   */
+  @Override
+  protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context) throws IOException, InterruptedException {
     int key = clusterId.get();
     WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
 
@@ -67,8 +69,34 @@ public class CDbwMapper extends MapReduc
       totalDistance += measure.distance(refPoint.get(), point.getVector().get());
     }
     if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
-      mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance,
-                                                            new VectorWritable(point.getVector().get().clone())));
+      mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, new VectorWritable(point.getVector().get().clone())));
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+   */
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    try {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl = ccl.loadClass(conf.get(CDbwDriver.DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      representativePoints = getRepresentativePoints(conf);
+    } catch (NumberFormatException e) {
+      throw new IllegalStateException(e);
+    } catch (SecurityException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalStateException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
     }
   }
 
@@ -77,15 +105,15 @@ public class CDbwMapper extends MapReduc
     this.measure = measure;
   }
 
-  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(JobConf job) {
-    String statePath = job.get(CDbwDriver.STATE_IN_KEY);
+  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
+    String statePath = conf.get(CDbwDriver.STATE_IN_KEY);
     Map<Integer, List<VectorWritable>> representativePoints = new HashMap<Integer, List<VectorWritable>>();
     try {
       Path path = new Path(statePath);
-      FileSystem fs = FileSystem.get(path.toUri(), job);
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
       FileStatus[] status = fs.listStatus(path, new OutputLogFilter());
       for (FileStatus s : status) {
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), job);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
         try {
           IntWritable key = new IntWritable(0);
           VectorWritable point = new VectorWritable();
@@ -107,35 +135,4 @@ public class CDbwMapper extends MapReduc
       throw new IllegalStateException(e);
     }
   }
-
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    try {
-      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-      Class<?> cl = ccl.loadClass(job.get(CDbwDriver.DISTANCE_MEASURE_KEY));
-      measure = (DistanceMeasure) cl.newInstance();
-      representativePoints = getRepresentativePoints(job);
-    } catch (NumberFormatException e) {
-      throw new IllegalStateException(e);
-    } catch (SecurityException e) {
-      throw new IllegalStateException(e);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalStateException(e);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException(e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    for (Integer clusterId : mostDistantPoints.keySet()) {
-      output.collect(new IntWritable(clusterId), mostDistantPoints.get(clusterId));
-    }
-    super.close();
-  }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java Wed Jul  7 18:46:19 2010
@@ -22,61 +22,56 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.math.VectorWritable;
 
-public class CDbwReducer extends MapReduceBase
-    implements Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+public class CDbwReducer extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
 
   private Map<Integer, List<VectorWritable>> referencePoints;
 
-  private OutputCollector<IntWritable, VectorWritable> output;
-
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(IntWritable key,
-                     Iterator<WeightedVectorWritable> values,
-                     OutputCollector<IntWritable, VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    this.output = output;
-    // find the most distant point
-    WeightedVectorWritable mdp = null;
-    while (values.hasNext()) {
-      WeightedVectorWritable dpw = values.next();
-      if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
-        mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    for (Integer clusterId : referencePoints.keySet()) {
+      for (VectorWritable vw : referencePoints.get(clusterId)) {
+        context.write(new IntWritable(clusterId), vw);
       }
     }
-    output.collect(new IntWritable(key.get()), mdp.getVector());
-  }
-
-  public void configure(Map<Integer, List<VectorWritable>> referencePoints) {
-    this.referencePoints = referencePoints;
+    super.cleanup(context);
   }
 
   /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
    */
   @Override
-  public void close() throws IOException {
-    for (Integer clusterId : referencePoints.keySet()) {
-      for (VectorWritable vw : referencePoints.get(clusterId)) {
-        output.collect(new IntWritable(clusterId), vw);
+  protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context) throws IOException,
+      InterruptedException {
+    // find the most distant point
+    WeightedVectorWritable mdp = null;
+    Iterator<WeightedVectorWritable> it = values.iterator();
+    while (it.hasNext()) {
+      WeightedVectorWritable dpw = it.next();
+      if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
+        mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
       }
     }
-    super.close();
+    context.write(new IntWritable(key.get()), mdp.getVector());
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
     try {
-      referencePoints = CDbwMapper.getRepresentativePoints(job);
+      referencePoints = CDbwMapper.getRepresentativePoints(conf);
     } catch (NumberFormatException e) {
       throw new IllegalStateException(e);
     } catch (SecurityException e) {
@@ -86,4 +81,8 @@ public class CDbwReducer extends MapRedu
     }
   }
 
+  public void configure(Map<Integer, List<VectorWritable>> referencePoints) {
+    this.referencePoints = referencePoints;
+  }
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java Wed Jul  7 18:46:19 2010
@@ -31,13 +31,11 @@ import org.apache.commons.cli2.builder.D
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,9 +81,7 @@ public final class SequenceFileDumper {
       
       if (cmdLine.hasOption(seqOpt)) {
         Path path = new Path(cmdLine.getValue(seqOpt).toString());
-        JobClient client = new JobClient();
-        JobConf conf = new JobConf(Job.class);
-        client.setConf(conf);
+        Configuration conf = new Configuration();
         FileSystem fs = FileSystem.get(path.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
         

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java Wed Jul  7 18:46:19 2010
@@ -40,6 +40,7 @@ import org.apache.commons.cli2.builder.D
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -47,9 +48,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.ClusterBase;
 import org.apache.mahout.clustering.WeightedVectorWritable;
@@ -90,7 +88,7 @@ public final class ClusterDumper {
 
   private void init() throws IOException {
     if (this.pointsDir != null) {
-      JobConf conf = new JobConf(Job.class);
+      Configuration conf = new Configuration();
       // read in the points
       clusterIdToPoints = readPoints(this.pointsDir, conf);
     } else {
@@ -99,9 +97,7 @@ public final class ClusterDumper {
   }
 
   public void printClusters(String[] dictionary) throws IOException, InstantiationException, IllegalAccessException {
-    JobClient client = new JobClient();
-    JobConf conf = new JobConf(Job.class);
-    client.setConf(conf);
+    Configuration conf = new Configuration();
 
     if (this.termDictionary != null) {
       if (dictionaryFormat.equals("text")) {
@@ -310,7 +306,7 @@ public final class ClusterDumper {
     this.useJSON = json;
   }
 
-  private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, JobConf conf)
+  private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)
       throws IOException {
     Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();
 

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java Wed Jul  7 18:46:19 2010
@@ -20,32 +20,30 @@ package org.apache.mahout.utils.nlp.coll
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 
 /** Combiner for pass1 of the CollocationDriver. Combines frequencies for values for the same key */
-public class CollocCombiner extends MapReduceBase implements Reducer<GramKey,Gram,GramKey,Gram> {
-  
+public class CollocCombiner extends Reducer<GramKey, Gram, GramKey, Gram> {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(GramKey key,
-                     Iterator<Gram> values,
-                     OutputCollector<GramKey,Gram> output,
-                     Reporter reporter) throws IOException {
+  protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
 
     int freq = 0;
     Gram value = null;
-    
+
     // accumulate frequencies from values.
-    while (values.hasNext()) {
-      value = values.next();
+    Iterator<Gram> it = values.iterator();
+    while (it.hasNext()) {
+      value = it.next();
       freq += value.getFrequency();
     }
 
     value.setFrequency(freq);
-    
-    output.collect(key, value);
+
+    context.write(key, value);
   }
-  
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java Wed Jul  7 18:46:19 2010
@@ -24,14 +24,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.mahout.common.AbstractJob;
@@ -45,15 +43,19 @@ import org.slf4j.LoggerFactory;
 /** Driver for LLR Collocation discovery mapreduce job */
 public final class CollocDriver extends AbstractJob {
   public static final String DEFAULT_OUTPUT_DIRECTORY = "output";
+
   public static final String SUBGRAM_OUTPUT_DIRECTORY = "subgrams";
+
   public static final String NGRAM_OUTPUT_DIRECTORY = "ngrams";
-  
+
   public static final String EMIT_UNIGRAMS = "emit-unigrams";
+
   public static final boolean DEFAULT_EMIT_UNIGRAMS = false;
-  
+
   public static final int DEFAULT_MAX_NGRAM_SIZE = 2;
+
   public static final int DEFAULT_PASS1_NUM_REDUCE_TASKS = 1;
-  
+
   private static final Logger log = LoggerFactory.getLogger(CollocDriver.class);
 
   private CollocDriver() {
@@ -68,36 +70,31 @@ public final class CollocDriver extends 
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.numReducersOption().create());
-    
-    addOption("maxNGramSize", "ng", 
-        "(Optional) The max size of ngrams to create (2 = bigrams, 3 = trigrams, etc) default: 2",
-        String.valueOf(DEFAULT_MAX_NGRAM_SIZE));
-    addOption("minSupport", "s", 
-        "(Optional) Minimum Support. Default Value: " + CollocReducer.DEFAULT_MIN_SUPPORT, 
-        String.valueOf(CollocReducer.DEFAULT_MIN_SUPPORT));
-    addOption("minLLR", "ml",
-        "(Optional)The minimum Log Likelihood Ratio(Float)  Default is " + LLRReducer.DEFAULT_MIN_LLR,
-        String.valueOf(LLRReducer.DEFAULT_MIN_LLR));
+
+    addOption("maxNGramSize",
+              "ng",
+              "(Optional) The max size of ngrams to create (2 = bigrams, 3 = trigrams, etc) default: 2",
+              String.valueOf(DEFAULT_MAX_NGRAM_SIZE));
+    addOption("minSupport", "s", "(Optional) Minimum Support. Default Value: " + CollocReducer.DEFAULT_MIN_SUPPORT, String
+        .valueOf(CollocReducer.DEFAULT_MIN_SUPPORT));
+    addOption("minLLR", "ml", "(Optional)The minimum Log Likelihood Ratio(Float)  Default is " + LLRReducer.DEFAULT_MIN_LLR, String
+        .valueOf(LLRReducer.DEFAULT_MIN_LLR));
     addOption(DefaultOptionCreator.overwriteOption().create());
-    addOption("analyzerName", "a",
-        "The class name of the analyzer to use for preprocessing", null);
-    
-    addFlag("preprocess", "p",
-        "If set, input is SequenceFile<Text,Text> where the value is the document, "
+    addOption("analyzerName", "a", "The class name of the analyzer to use for preprocessing", null);
+
+    addFlag("preprocess", "p", "If set, input is SequenceFile<Text,Text> where the value is the document, "
         + " which will be tokenized using the specified analyzer.");
-    addFlag("unigram", "u", 
-        "If set, unigrams will be emitted in the final output alongside collocations");
-    
+    addFlag("unigram", "u", "If set, unigrams will be emitted in the final output alongside collocations");
+
     Map<String, String> argMap = parseArguments(args);
-    
+
     if (argMap == null) {
       return -1;
     }
-    
+
     Path input = getInputPath();
     Path output = getOutputPath();
-    
-    
+
     int maxNGramSize = DEFAULT_MAX_NGRAM_SIZE;
     if (argMap.get("--maxNGramSize") != null) {
       try {
@@ -107,39 +104,34 @@ public final class CollocDriver extends 
       }
     }
     log.info("Maximum n-gram size is: {}", maxNGramSize);
-    
-    
+
     if (argMap.containsKey("--overwrite")) {
       HadoopUtil.overwriteOutput(output);
     }
-    
-    
+
     int minSupport = CollocReducer.DEFAULT_MIN_SUPPORT;
     if (argMap.get("--minsupport") != null) {
       minSupport = Integer.parseInt(argMap.get("--minsupport"));
     }
     log.info("Minimum Support value: {}", minSupport);
-    
-    
+
     float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
     if (argMap.get("--minLLR") != null) {
       minLLRValue = Float.parseFloat(argMap.get("--minLLR"));
     }
     log.info("Minimum LLR value: {}", minLLRValue);
-    
-    
+
     int reduceTasks = DEFAULT_PASS1_NUM_REDUCE_TASKS;
     if (argMap.get("--maxRed") != null) {
       reduceTasks = Integer.parseInt(argMap.get("--maxRed"));
     }
     log.info("Number of pass1 reduce tasks: {}", reduceTasks);
-    
-    
+
     boolean emitUnigrams = argMap.containsKey("--emitUnigrams");
 
     if (argMap.containsKey("--preprocess")) {
       log.info("Input will be preprocessed");
-      
+
       Class<? extends Analyzer> analyzerClass = DefaultAnalyzer.class;
       if (argMap.get("--analyzerName") != null) {
         String className = argMap.get("--analyzerName");
@@ -148,25 +140,24 @@ public final class CollocDriver extends 
         // you can't instantiate it
         analyzerClass.newInstance();
       }
-      
+
       Path tokenizedPath = new Path(output, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
-      
+
       DocumentProcessor.tokenizeDocuments(input, analyzerClass, tokenizedPath);
       input = tokenizedPath;
     } else {
       log.info("Input will NOT be preprocessed");
     }
-    
+
     // parse input and extract collocations
-    long ngramCount = generateCollocations(input, output, getConf(), emitUnigrams, maxNGramSize,
-      reduceTasks, minSupport);
-    
+    long ngramCount = generateCollocations(input, output, getConf(), emitUnigrams, maxNGramSize, reduceTasks, minSupport);
+
     // tally collocations and perform LLR calculation
     computeNGramsPruneByLLR(output, getConf(), ngramCount, emitUnigrams, minLLRValue, reduceTasks);
 
     return 0;
   }
-  
+
   /**
    * Generate all ngrams for the {@link org.apache.mahout.utils.vectors.text.DictionaryVectorizer} job
    * 
@@ -183,6 +174,8 @@ public final class CollocDriver extends 
    * @param reduceTasks
    *          number of reducers used
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void generateAllGrams(Path input,
                                       Path output,
@@ -190,15 +183,14 @@ public final class CollocDriver extends 
                                       int maxNGramSize,
                                       int minSupport,
                                       float minLLRValue,
-                                      int reduceTasks) throws IOException {
+                                      int reduceTasks) throws IOException, InterruptedException, ClassNotFoundException {
     // parse input and extract collocations
-    long ngramCount = generateCollocations(input, output, baseConf, true, maxNGramSize, reduceTasks,
-      minSupport);
-    
+    long ngramCount = generateCollocations(input, output, baseConf, true, maxNGramSize, reduceTasks, minSupport);
+
     // tally collocations and perform LLR calculation
     computeNGramsPruneByLLR(output, baseConf, ngramCount, true, minLLRValue, reduceTasks);
   }
-  
+
   /**
    * pass1: generate collocations, ngrams
    */
@@ -209,73 +201,74 @@ public final class CollocDriver extends 
                                           int maxNGramSize,
                                           int reduceTasks,
                                           int minSupport) throws IOException {
-    
-    JobConf conf = new JobConf(baseConf, CollocDriver.class);
-    conf.setJobName(CollocDriver.class.getSimpleName() + ".generateCollocations:" + input);
-    
-    conf.setMapOutputKeyClass(GramKey.class);
-    conf.setMapOutputValueClass(Gram.class);
-    conf.setPartitionerClass(GramKeyPartitioner.class);
-    conf.setOutputValueGroupingComparator(GramKeyGroupComparator.class);
-    
-    conf.setOutputKeyClass(Gram.class);
-    conf.setOutputValueClass(Gram.class);
-    
-    conf.setCombinerClass(CollocCombiner.class);
-    
-    conf.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
-    
-    FileInputFormat.setInputPaths(conf, input);
-    
+
+    Configuration con = new Configuration();
+    con.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
+    con.setInt(CollocMapper.MAX_SHINGLE_SIZE, maxNGramSize);
+    con.setInt(CollocReducer.MIN_SUPPORT, minSupport);
+    Job job = new Job(con);
+    job.setJobName(CollocDriver.class.getSimpleName() + ".generateCollocations:" + input);
+
+    job.setMapOutputKeyClass(GramKey.class);
+    job.setMapOutputValueClass(Gram.class);
+    job.setPartitionerClass(GramKeyPartitioner.class);
+    job.setGroupingComparatorClass(GramKeyGroupComparator.class);
+
+    job.setOutputKeyClass(Gram.class);
+    job.setOutputValueClass(Gram.class);
+
+    job.setCombinerClass(CollocCombiner.class);
+
+    FileInputFormat.setInputPaths(job, input);
+
     Path outputPath = new Path(output, SUBGRAM_OUTPUT_DIRECTORY);
-    FileOutputFormat.setOutputPath(conf, outputPath);
-    
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setMapperClass(CollocMapper.class);
-    
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    conf.setReducerClass(CollocReducer.class);
-    conf.setInt(CollocMapper.MAX_SHINGLE_SIZE, maxNGramSize);
-    conf.setInt(CollocReducer.MIN_SUPPORT, minSupport);
-    conf.setNumReduceTasks(reduceTasks);
-    
-    RunningJob job = JobClient.runJob(conf);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapperClass(CollocMapper.class);
+
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setReducerClass(CollocReducer.class);
+    job.setNumReduceTasks(reduceTasks);
+
     return job.getCounters().findCounter(CollocMapper.Count.NGRAM_TOTAL).getValue();
   }
-  
+
   /**
    * pass2: perform the LLR calculation
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void computeNGramsPruneByLLR(Path output,
-                                                Configuration baseConf,
-                                                long nGramTotal,
-                                                boolean emitUnigrams,
-                                                float minLLRValue,
-                                                int reduceTasks) throws IOException {
-    JobConf conf = new JobConf(baseConf, CollocDriver.class);
-    conf.setJobName(CollocDriver.class.getSimpleName() + ".computeNGrams: " + output);
-    
-    
+                                             Configuration baseConf,
+                                             long nGramTotal,
+                                             boolean emitUnigrams,
+                                             float minLLRValue,
+                                             int reduceTasks) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
     conf.setLong(LLRReducer.NGRAM_TOTAL, nGramTotal);
     conf.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
-    
-    conf.setMapOutputKeyClass(Gram.class);
-    conf.setMapOutputValueClass(Gram.class);
-    
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(DoubleWritable.class);
-    
-    FileInputFormat.setInputPaths(conf, new Path(output, SUBGRAM_OUTPUT_DIRECTORY));
+ 
+    Job job = new Job(conf);
+    job.setJobName(CollocDriver.class.getSimpleName() + ".computeNGrams: " + output);
+
+    job.setMapOutputKeyClass(Gram.class);
+    job.setMapOutputValueClass(Gram.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(DoubleWritable.class);
+
+    FileInputFormat.setInputPaths(job, new Path(output, SUBGRAM_OUTPUT_DIRECTORY));
     Path outPath = new Path(output, NGRAM_OUTPUT_DIRECTORY);
-    FileOutputFormat.setOutputPath(conf, outPath);
-    
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    conf.setReducerClass(LLRReducer.class);
-    conf.setNumReduceTasks(reduceTasks);
-    
+    FileOutputFormat.setOutputPath(job, outPath);
+
+    job.setMapperClass(Mapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setReducerClass(LLRReducer.class);
+    job.setNumReduceTasks(reduceTasks);
+
     conf.setFloat(LLRReducer.MIN_LLR, minLLRValue);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java Wed Jul  7 18:46:19 2010
@@ -20,12 +20,9 @@ package org.apache.mahout.utils.nlp.coll
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.shingle.ShingleFilter;
 import org.apache.lucene.analysis.tokenattributes.TermAttribute;
@@ -41,36 +38,24 @@ import org.slf4j.LoggerFactory;
  * Input is a SequeceFile<Text,StringTuple>, where the key is a document id and the value is the tokenized documents.
  * <p/>
  */
-public class CollocMapper extends MapReduceBase implements Mapper<Text,StringTuple,GramKey,Gram> {
+public class CollocMapper extends Mapper<Text, StringTuple, GramKey, Gram> {
 
   private static final byte[] EMPTY = new byte[0];
-  
+
   public static final String MAX_SHINGLE_SIZE = "maxShingleSize";
+
   public static final int DEFAULT_MAX_SHINGLE_SIZE = 2;
-  
+
   public enum Count {
     NGRAM_TOTAL
   }
-  
+
   private static final Logger log = LoggerFactory.getLogger(CollocMapper.class);
-  
+
   private int maxShingleSize;
+
   private boolean emitUnigrams;
-  
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    
-    this.maxShingleSize = job.getInt(MAX_SHINGLE_SIZE, DEFAULT_MAX_SHINGLE_SIZE);
-    
-    this.emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-    
-    if (log.isInfoEnabled()) {
-      log.info("Max Ngram size is {}", this.maxShingleSize);
-      log.info("Emit Unitgrams is {}", emitUnigrams);
-    }
-  }
-  
+
   /**
    * Collocation finder: pass 1 map phase.
    * <p/>
@@ -110,16 +95,14 @@ public class CollocMapper extends MapRed
    *           if there's a problem with the ShingleFilter reading data or the collector collecting output.
    */
   @Override
-  public void map(Text key, StringTuple value,
-                  final OutputCollector<GramKey,Gram> collector, Reporter reporter) throws IOException {
-    
+  protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
+
     ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()), maxShingleSize);
     int count = 0; // ngram count
-    
-    OpenObjectIntHashMap<String> ngrams = new OpenObjectIntHashMap<String>(value.getEntries().size()
-                                                                           * (maxShingleSize - 1));
+
+    OpenObjectIntHashMap<String> ngrams = new OpenObjectIntHashMap<String>(value.getEntries().size() * (maxShingleSize - 1));
     OpenObjectIntHashMap<String> unigrams = new OpenObjectIntHashMap<String>(value.getEntries().size());
-    
+
     do {
       String term = ((TermAttribute) sf.getAttribute(TermAttribute.class)).term();
       String type = ((TypeAttribute) sf.getAttribute(TypeAttribute.class)).type();
@@ -130,51 +113,55 @@ public class CollocMapper extends MapRed
         unigrams.adjustOrPutValue(term, 1, 1);
       }
     } while (sf.incrementToken());
-    
+
     try {
       final GramKey gramKey = new GramKey();
-      
+
       ngrams.forEachPair(new ObjectIntProcedure<String>() {
         @Override
         public boolean apply(String term, int frequency) {
           // obtain components, the leading (n-1)gram and the trailing unigram.
           int i = term.lastIndexOf(' '); // TODO: fix for non-whitespace delimited languages.
           if (i != -1) { // bigram, trigram etc
-            
+
             try {
               Gram ngram = new Gram(term, frequency, Gram.Type.NGRAM);
-              Gram head  = new Gram(term.substring(0, i), frequency, Gram.Type.HEAD);
-              Gram tail  = new Gram(term.substring(i + 1), frequency, Gram.Type.TAIL);
-              
+              Gram head = new Gram(term.substring(0, i), frequency, Gram.Type.HEAD);
+              Gram tail = new Gram(term.substring(i + 1), frequency, Gram.Type.TAIL);
+
               gramKey.set(head, EMPTY);
-              collector.collect(gramKey, head);
-              
+              context.write(gramKey, head);
+
               gramKey.set(head, ngram.getBytes());
-              collector.collect(gramKey, ngram);
-              
+              context.write(gramKey, ngram);
+
               gramKey.set(tail, EMPTY);
-              collector.collect(gramKey, tail);
-              
+              context.write(gramKey, tail);
+
               gramKey.set(tail, ngram.getBytes());
-              collector.collect(gramKey, ngram);
-              
+              context.write(gramKey, ngram);
+
             } catch (IOException e) {
               throw new IllegalStateException(e);
+            } catch (InterruptedException e) {
+              throw new IllegalStateException(e);
             }
           }
           return true;
         }
       });
-  
+
       unigrams.forEachPair(new ObjectIntProcedure<String>() {
         @Override
         public boolean apply(String term, int frequency) {
           try {
             Gram unigram = new Gram(term, frequency, Gram.Type.UNIGRAM);
             gramKey.set(unigram, EMPTY);
-            collector.collect(gramKey, unigram);
+            context.write(gramKey, unigram);
           } catch (IOException e) {
             throw new IllegalStateException(e);
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
           return true;
         }
@@ -188,23 +175,41 @@ public class CollocMapper extends MapRed
         throw ise;
       }
     }
-    
-    reporter.incrCounter(Count.NGRAM_TOTAL, count);
-    
+
+    context.getCounter(Count.NGRAM_TOTAL).increment(count);
+
     sf.end();
     sf.close();
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+   */
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    this.maxShingleSize = conf.getInt(MAX_SHINGLE_SIZE, DEFAULT_MAX_SHINGLE_SIZE);
+
+    this.emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+    if (log.isInfoEnabled()) {
+      log.info("Max Ngram size is {}", this.maxShingleSize);
+      log.info("Emit Unitgrams is {}", emitUnigrams);
+    }
+  }
+
   /** Used to emit tokens from an input string array in the style of TokenStream */
   public static class IteratorTokenStream extends TokenStream {
     private final TermAttribute termAtt;
+
     private final Iterator<String> iterator;
-    
+
     public IteratorTokenStream(Iterator<String> iterator) {
       this.iterator = iterator;
       this.termAtt = (TermAttribute) addAttribute(TermAttribute.class);
     }
-    
+
     @Override
     public boolean incrementToken() throws IOException {
       if (iterator.hasNext()) {

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java Wed Jul  7 18:46:19 2010
@@ -20,47 +20,28 @@ package org.apache.mahout.utils.nlp.coll
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Reducer for Pass 1 of the collocation identification job. Generates counts for ngrams and subgrams.
  */
-public class CollocReducer extends MapReduceBase implements Reducer<GramKey,Gram,Gram,Gram> {
+public class CollocReducer extends Reducer<GramKey, Gram, Gram, Gram> {
 
   private static final Logger log = LoggerFactory.getLogger(CollocReducer.class);
 
   public static final String MIN_SUPPORT = "minSupport";
+
   public static final int DEFAULT_MIN_SUPPORT = 2;
-  
+
   public enum Skipped {
-    LESS_THAN_MIN_SUPPORT,
-    MALFORMED_KEY_TUPLE,
-    MALFORMED_TUPLE,
-    MALFORMED_TYPES,
-    MALFORMED_UNIGRAM
+    LESS_THAN_MIN_SUPPORT, MALFORMED_KEY_TUPLE, MALFORMED_TUPLE, MALFORMED_TYPES, MALFORMED_UNIGRAM
   }
 
   private int minSupport;
 
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    
-    this.minSupport = job.getInt(MIN_SUPPORT, DEFAULT_MIN_SUPPORT);
-
-    boolean emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-    
-    log.info("Min support is {}", minSupport);
-    log.info("Emit Unitgrams is {}", emitUnigrams);
-
-  }
-  
   /**
    * collocation finder: pass 1 reduce phase:
    * <p/>
@@ -86,33 +67,45 @@ public class CollocReducer extends MapRe
    * head and move the count into the value?
    */
   @Override
-  public void reduce(GramKey key,
-                     Iterator<Gram> values,
-                     OutputCollector<Gram,Gram> output,
-                     Reporter reporter) throws IOException {
-    
+  protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
     Gram.Type keyType = key.getType();
 
     if (keyType == Gram.Type.UNIGRAM) {
       // sum frequencies for unigrams.
-      processUnigram(key, values, output, reporter);
+      processUnigram(key, values.iterator(), context);
     } else if (keyType == Gram.Type.HEAD || keyType == Gram.Type.TAIL) {
       // sum frequencies for subgrams, ngram and collect for each ngram.
-      processSubgram(key, values, output, reporter);
+      processSubgram(key, values.iterator(), context);
     } else {
-      reporter.incrCounter(Skipped.MALFORMED_TYPES, 1);
+      context.getCounter(Skipped.MALFORMED_TYPES).increment(1);
     }
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    this.minSupport = conf.getInt(MIN_SUPPORT, DEFAULT_MIN_SUPPORT);
+
+    boolean emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+    log.info("Min support is {}", minSupport);
+    log.info("Emit Unitgrams is {}", emitUnigrams);
+  }
+
   /**
    * Sum frequencies for unigrams and deliver to the collector
+   * @throws InterruptedException 
    */
-  protected void processUnigram(GramKey key, Iterator<Gram> values,
-      OutputCollector<Gram, Gram> output, Reporter reporter) throws IOException {
+  protected void processUnigram(GramKey key, Iterator<Gram> values, Context context) throws IOException, InterruptedException {
 
     int freq = 0;
     Gram value = null;
-    
+
     // accumulate frequencies from values.
     while (values.hasNext()) {
       value = values.next();
@@ -120,15 +113,15 @@ public class CollocReducer extends MapRe
     }
 
     if (freq < minSupport) {
-      reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+      context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
       return;
     }
 
     value.setFrequency(freq);
-    output.collect(value, value);
+    context.write(value, value);
 
   }
-      
+
   /** Sum frequencies for subgram, ngrams and deliver ngram, subgram pairs to the collector.
    *  <p/>
    *  Sort order guarantees that the subgram/subgram pairs will be seen first and then
@@ -137,17 +130,17 @@ public class CollocReducer extends MapRe
    *  <p/>
    *  We end up calculating frequencies for ngrams for each sugram (head, tail) here, which is
    *  some extra work.
+   * @throws InterruptedException 
    */
-  protected void processSubgram(GramKey key, Iterator<Gram> values, 
-      OutputCollector<Gram,Gram> output, Reporter reporter) throws IOException {
+  protected void processSubgram(GramKey key, Iterator<Gram> values, Context context) throws IOException, InterruptedException {
 
-    Gram subgram      = null;
+    Gram subgram = null;
     Gram currentNgram = null;
-        
+
     while (values.hasNext()) {
       Gram value = values.next();
 
-      if (value.getType() == Gram.Type.HEAD || value.getType() == Gram.Type.TAIL) { 
+      if (value.getType() == Gram.Type.HEAD || value.getType() == Gram.Type.TAIL) {
         // collect frequency for subgrams.
         if (subgram == null) {
           subgram = new Gram(value);
@@ -160,9 +153,9 @@ public class CollocReducer extends MapRe
         // create the new ngram.
         if (currentNgram != null) {
           if (currentNgram.getFrequency() < minSupport) {
-            reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+            context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
           } else {
-            output.collect(currentNgram, subgram);
+            context.write(currentNgram, subgram);
           }
         }
 
@@ -171,15 +164,15 @@ public class CollocReducer extends MapRe
         currentNgram.incrementFrequency(value.getFrequency());
       }
     }
-    
+
     // collect last ngram.
     if (currentNgram != null) {
       if (currentNgram.getFrequency() < minSupport) {
-        reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+        context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
         return;
       }
-      
-      output.collect(currentNgram, subgram);
+
+      context.write(currentNgram, subgram);
     }
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java Wed Jul  7 18:46:19 2010
@@ -19,14 +19,13 @@ package org.apache.mahout.utils.nlp.coll
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
 
 /**
  * Partition GramKeys based on their Gram, ignoring the secondary sort key so that all GramKeys with the same
  * gram are sent to the same partition.
  */
-public class GramKeyPartitioner implements Partitioner<GramKey, Gram> {
+public class GramKeyPartitioner extends Partitioner<GramKey, Gram> {
 
   private static final String HASH_OFFSET_PROPERTY_NAME = "grampartitioner.hash.offset";
     
@@ -45,8 +44,7 @@ public class GramKeyPartitioner implemen
     return (hash & Integer.MAX_VALUE) % numPartitions;
   }
 
-  @Override
-  public void configure(JobConf conf) {
+  public void configure(Configuration conf) {
     offset = conf.getInt(HASH_OFFSET_PROPERTY_NAME, -1);
   }
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java Wed Jul  7 18:46:19 2010
@@ -20,13 +20,10 @@ package org.apache.mahout.utils.nlp.coll
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.stats.LogLikelihood;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,64 +32,29 @@ import org.slf4j.LoggerFactory;
  * Reducer for pass 2 of the collocation discovery job. Collects ngram and sub-ngram frequencies and performs
  * the Log-likelihood ratio calculation.
  */
-public class LLRReducer extends MapReduceBase implements Reducer<Gram,Gram,Text,DoubleWritable> {
-  
+public class LLRReducer extends Reducer<Gram, Gram, Text, DoubleWritable> {
+
   /** Counter to track why a particlar entry was skipped */
   public enum Skipped {
-    EXTRA_HEAD,
-    EXTRA_TAIL,
-    MISSING_HEAD,
-    MISSING_TAIL,
-    LESS_THAN_MIN_LLR,
-    LLR_CALCULATION_ERROR,
+    EXTRA_HEAD, EXTRA_TAIL, MISSING_HEAD, MISSING_TAIL, LESS_THAN_MIN_LLR, LLR_CALCULATION_ERROR,
   }
 
   private static final Logger log = LoggerFactory.getLogger(LLRReducer.class);
-  
+
   public static final String NGRAM_TOTAL = "ngramTotal";
+
   public static final String MIN_LLR = "minLLR";
+
   public static final float DEFAULT_MIN_LLR = 1.0f;
-  
+
   private long ngramTotal;
+
   private float minLLRValue;
+
   private boolean emitUnigrams;
-  
+
   private final LLCallback ll;
-  
-  public LLRReducer() {
-    this.ll = new ConcreteLLCallback();
-  }
-  
-  /**
-   * plug in an alternate LL implementation, used for testing
-   * 
-   * @param ll
-   *          the LL to use.
-   */
-  LLRReducer(LLCallback ll) {
-    this.ll = ll;
-  }
-  
-  @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    
-    this.ngramTotal = job.getLong(NGRAM_TOTAL, -1);
-    this.minLLRValue = job.getFloat(MIN_LLR, DEFAULT_MIN_LLR);
-    
-    this.emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-    
-    if (log.isInfoEnabled()) {
-      log.info("NGram Total is {}", ngramTotal);
-      log.info("Min LLR value is {}", minLLRValue);
-      log.info("Emit Unitgrams is {}", emitUnigrams);
-    }
-    
-    if (ngramTotal == -1) {
-      throw new IllegalStateException("No NGRAM_TOTAL available in job config");
-    }
-  }
-  
+
   /**
    * Perform LLR calculation, input is: k:ngram:ngramFreq v:(h_|t_)subgram:subgramfreq N = ngram total
    * 
@@ -103,67 +65,65 @@ public class LLRReducer extends MapReduc
    * number of times neither A or B appears (in that order): N - (subgramFreqA + subgramFreqB - ngramFreq)
    */
   @Override
-  public void reduce(Gram ngram,
-                     Iterator<Gram> values,
-                     OutputCollector<Text,DoubleWritable> output,
-                     Reporter reporter) throws IOException {
-    
-    int[] gramFreq = {-1, -1};
-    
+  protected void reduce(Gram ngram, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
+    int[] gramFreq = { -1, -1 };
+
     if (ngram.getType() == Gram.Type.UNIGRAM && emitUnigrams) {
       DoubleWritable dd = new DoubleWritable(ngram.getFrequency());
       Text t = new Text(ngram.getString());
-      output.collect(t, dd);
+      context.write(t, dd);
       return;
     }
     // FIXME: better way to handle errors? Wouldn't an exception thrown here
     // cause hadoop to re-try the job?
     String[] gram = new String[2];
-    while (values.hasNext()) {
-      Gram value = values.next();
-      
+    Iterator<Gram> it = values.iterator();
+    while (it.hasNext()) {
+      Gram value = it.next();
+
       int pos = value.getType() == Gram.Type.HEAD ? 0 : 1;
-      
+
       if (gramFreq[pos] != -1) {
         log.warn("Extra {} for {}, skipping", value.getType(), ngram);
         if (value.getType() == Gram.Type.HEAD) {
-          reporter.incrCounter(Skipped.EXTRA_HEAD, 1);
+          context.getCounter(Skipped.EXTRA_HEAD).increment(1);
         } else {
-          reporter.incrCounter(Skipped.EXTRA_TAIL, 1);
+          context.getCounter(Skipped.EXTRA_TAIL).increment(1);
         }
         return;
       }
-      
+
       gram[pos] = value.getString();
       gramFreq[pos] = value.getFrequency();
     }
-    
+
     if (gramFreq[0] == -1) {
       log.warn("Missing head for {}, skipping.", ngram);
-      reporter.incrCounter(Skipped.MISSING_HEAD, 1);
+      context.getCounter(Skipped.MISSING_HEAD).increment(1);
       return;
     } else if (gramFreq[1] == -1) {
       log.warn("Missing tail for {}, skipping", ngram);
-      reporter.incrCounter(Skipped.MISSING_TAIL, 1);
+      context.getCounter(Skipped.MISSING_TAIL).increment(1);
       return;
     }
-    
+
     int k11 = ngram.getFrequency(); /* a&b */
     int k12 = gramFreq[0] - ngram.getFrequency(); /* a&!b */
     int k21 = gramFreq[1] - ngram.getFrequency(); /* !b&a */
     int k22 = (int) (ngramTotal - (gramFreq[0] + gramFreq[1] - ngram.getFrequency())); /* !a&!b */
-    
+
     try {
       double llr = ll.logLikelihoodRatio(k11, k12, k21, k22);
       if (llr < minLLRValue) {
-        reporter.incrCounter(Skipped.LESS_THAN_MIN_LLR, 1);
+        context.getCounter(Skipped.LESS_THAN_MIN_LLR).increment(1);
         return;
       }
       DoubleWritable dd = new DoubleWritable(llr);
       Text t = new Text(ngram.getString());
-      output.collect(t, dd);
+      context.write(t, dd);
     } catch (IllegalArgumentException ex) {
-      reporter.incrCounter(Skipped.LLR_CALCULATION_ERROR, 1);
+      context.getCounter(Skipped.LLR_CALCULATION_ERROR).increment(1);
       log.error("Problem calculating LLR ratio: " + ex.getMessage());
       log.error("NGram: " + ngram);
       log.error("HEAD: " + gram[0] + ':' + gramFreq[0]);
@@ -171,14 +131,51 @@ public class LLRReducer extends MapReduc
       log.error("k11: " + k11 + " k12: " + k12 + " k21: " + k21 + " k22: " + k22);
     }
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    this.ngramTotal = conf.getLong(NGRAM_TOTAL, -1);
+    this.minLLRValue = conf.getFloat(MIN_LLR, DEFAULT_MIN_LLR);
+
+    this.emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+    if (log.isInfoEnabled()) {
+      log.info("NGram Total is {}", ngramTotal);
+      log.info("Min LLR value is {}", minLLRValue);
+      log.info("Emit Unitgrams is {}", emitUnigrams);
+    }
+
+    if (ngramTotal == -1) {
+      throw new IllegalStateException("No NGRAM_TOTAL available in job config");
+    }
+  }
+
+  public LLRReducer() {
+    this.ll = new ConcreteLLCallback();
+  }
+
+  /**
+   * plug in an alternate LL implementation, used for testing
+   * 
+   * @param ll
+   *          the LL to use.
+   */
+  LLRReducer(LLCallback ll) {
+    this.ll = ll;
+  }
+
   /**
    * provide interface so the input to the llr calculation can be captured for validation in unit testing
    */
   public interface LLCallback {
     double logLikelihoodRatio(int k11, int k12, int k21, int k22);
   }
-  
+
   /** concrete implementation delegates to LogLikelihood class */
   public static final class ConcreteLLCallback implements LLCallback {
     @Override

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java Wed Jul  7 18:46:19 2010
@@ -26,12 +26,10 @@ import org.apache.commons.cli2.builder.D
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.utils.vectors.SequenceFileVectorIterable.SeqFileIterator;
@@ -104,9 +102,7 @@ public final class VectorDumper {
       if (cmdLine.hasOption(seqOpt)) {
         Path path = new Path(cmdLine.getValue(seqOpt).toString());
         //System.out.println("Input Path: " + path); interferes with output?
-        JobClient client = new JobClient();
-        JobConf conf = new JobConf(Job.class);
-        client.setConf(conf);
+        Configuration conf = new Configuration();
 
         FileSystem fs = FileSystem.get(path.toUri(), conf);
 

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java Wed Jul  7 18:46:19 2010
@@ -18,7 +18,6 @@
 package org.apache.mahout.utils.vectors.arff;
 
 import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.Map;
 
 

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java Wed Jul  7 18:46:19 2010
@@ -20,12 +20,9 @@ package org.apache.mahout.utils.vectors.
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -35,22 +32,26 @@ import org.apache.mahout.math.VectorWrit
 /**
  * Merges partial vectors in to a full sparse vector
  */
-public class PartialVectorMergeReducer extends MapReduceBase implements
-    Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+public class PartialVectorMergeReducer extends
+    Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
 
   private double normPower;
+
   private int dimension;
+
   private boolean sequentialAccess;
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void reduce(WritableComparable<?> key,
-                     Iterator<VectorWritable> values,
-                     OutputCollector<WritableComparable<?>,VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    
+  protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context) throws IOException,
+      InterruptedException {
+
     Vector vector = new RandomAccessSparseVector(dimension, 10);
-    while (values.hasNext()) {
-      VectorWritable value = values.next();
+    Iterator<VectorWritable> it = values.iterator();
+    while (it.hasNext()) {
+      VectorWritable value = it.next();
       value.get().addTo(vector);
     }
     if (normPower != PartialVectorMerger.NO_NORMALIZING) {
@@ -60,14 +61,19 @@ public class PartialVectorMergeReducer e
       vector = new SequentialAccessSparseVector(vector);
     }
     VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
-    output.collect(key, vectorWritable);
+    context.write(key, vectorWritable);
   }
-  
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+   */
   @Override
-  public void configure(JobConf job) {
-    super.configure(job);
-    normPower = job.getFloat(PartialVectorMerger.NORMALIZATION_POWER, PartialVectorMerger.NO_NORMALIZING);
-    dimension = job.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
-    sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    normPower = conf.getFloat(PartialVectorMerger.NORMALIZATION_POWER, PartialVectorMerger.NO_NORMALIZING);
+    dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
+    sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
   }
+
 }

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java Wed Jul  7 18:46:19 2010
@@ -20,16 +20,15 @@ package org.apache.mahout.utils.vectors.
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.math.VectorWritable;
 
@@ -41,22 +40,22 @@ import org.apache.mahout.math.VectorWrit
  * 
  */
 public final class PartialVectorMerger {
-  
+
   public static final float NO_NORMALIZING = -1.0f;
-  
+
   public static final String NORMALIZATION_POWER = "normalization.power";
-  
+
   public static final String DIMENSION = "vector.dimension";
-  
+
   public static final String SEQUENTIAL_ACCESS = "vector.sequentialAccess";
-  
+
   /**
    * Cannot be initialized. Use the static functions
    */
   private PartialVectorMerger() {
 
   }
-  
+
   /**
    * Merge all the partial {@link org.apache.mahout.math.RandomAccessSparseVector}s into the complete Document
    * {@link org.apache.mahout.math.RandomAccessSparseVector}
@@ -70,46 +69,48 @@ public final class PartialVectorMerger {
    * @param numReducers 
    *          The number of reducers to spawn
    * @throws IOException
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static void mergePartialVectors(List<Path> partialVectorPaths,
                                          Path output,
                                          float normPower,
                                          int dimension,
-                                         boolean sequentialAccess, 
-                                         int numReducers) throws IOException {
+                                         boolean sequentialAccess,
+                                         int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
     if (normPower != NO_NORMALIZING && normPower < 0) {
       throw new IllegalArgumentException("normPower must either be -1 or >= 0");
     }
-    
-    Configurable client = new JobClient();
-    JobConf conf = new JobConf(PartialVectorMerger.class);
-    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
-                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+    Configuration conf = new Configuration();
     // this conf parameter needs to be set enable serialisation of conf values
-    conf.setJobName("PartialVectorMerger::MergePartialVectors");
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
     conf.setBoolean(SEQUENTIAL_ACCESS, sequentialAccess);
     conf.setInt(DIMENSION, dimension);
     conf.setFloat(NORMALIZATION_POWER, normPower);
-    
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(VectorWritable.class);
-    
-    FileInputFormat.setInputPaths(conf, getCommaSeparatedPaths(partialVectorPaths));
-    
-    FileOutputFormat.setOutputPath(conf, output);
-    
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setReducerClass(PartialVectorMergeReducer.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class);
-    conf.setNumReduceTasks(numReducers);
-    
+
+    Job job = new Job(conf);
+    job.setJobName("PartialVectorMerger::MergePartialVectors");
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    FileInputFormat.setInputPaths(job, getCommaSeparatedPaths(partialVectorPaths));
+
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setMapperClass(Mapper.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setReducerClass(PartialVectorMergeReducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(numReducers);
+
     HadoopUtil.overwriteOutput(output);
 
-    client.setConf(conf);
-    JobClient.runJob(conf);
+    job.waitForCompletion(true);
   }
-  
+
   private static String getCommaSeparatedPaths(List<Path> paths) {
     StringBuilder commaSeparatedPaths = new StringBuilder();
     String sep = "";