You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/07/07 20:46:20 UTC
svn commit: r961476 [1/2] - in /mahout/trunk:
core/src/test/java/org/apache/mahout/common/
utils/src/main/java/org/apache/mahout/clustering/cdbw/
utils/src/main/java/org/apache/mahout/utils/
utils/src/main/java/org/apache/mahout/utils/clustering/ utils...
Author: jeastman
Date: Wed Jul 7 18:46:19 2010
New Revision: 961476
URL: http://svn.apache.org/viewvc?rev=961476&view=rev
Log:
MAHOUT-167: ported utils/ to Hadoop 0.20.2. All tests pass but are not exhaustive
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DocumentProcessor.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/document/SequenceFileTokenizerMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermCountReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountMapper.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TermDocumentCountReducer.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFConverter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/tfidf/TFIDFPartialVectorReducer.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapperTest.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducerTest.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducerTest.java
mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java
Added: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java?rev=961476&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyCounter.java Wed Jul 7 18:46:19 2010
@@ -0,0 +1,11 @@
+package org.apache.mahout.common;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+class DummyCounter extends Counter {
+
+ public DummyCounter() {
+ super();
+ }
+
+}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyStatusReporter.java Wed Jul 7 18:46:19 2010
@@ -1,19 +1,29 @@
package org.apache.mahout.common;
-import org.apache.commons.lang.NotImplementedException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
public class DummyStatusReporter extends StatusReporter {
+ Map<Enum<?>, Counter> counters = new HashMap<Enum<?>, Counter>();
+
@Override
public Counter getCounter(Enum<?> name) {
- throw new NotImplementedException();
+ if (!counters.containsKey(name))
+ counters.put(name, new DummyCounter());
+ return counters.get(name);
}
+ Map<String, Counter> counterGroups = new HashMap<String, Counter>();
+
@Override
public Counter getCounter(String group, String name) {
- throw new NotImplementedException();
+ if (!counterGroups.containsKey(group + name))
+ counterGroups.put(group + name, new DummyCounter());
+ return counterGroups.get(name);
}
@Override
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java Wed Jul 7 18:46:19 2010
@@ -18,7 +18,6 @@
package org.apache.mahout.clustering.cdbw;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
@@ -28,23 +27,21 @@ import org.apache.commons.cli2.builder.A
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.dirichlet.DirichletCluster;
-import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.math.VectorWritable;
@@ -74,17 +71,15 @@ public final class CDbwDriver {
Option maxIterOpt = DefaultOptionCreator.maxIterationsOption().create();
Option helpOpt = DefaultOptionCreator.helpOption();
- Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(
- abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
- "The ModelDistribution class name. "
+ Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(abuilder
+ .withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription("The ModelDistribution class name. "
+ "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution").create();
- Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(
- abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create())
- .withDescription("The number of reduce tasks.").create();
+ Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(abuilder
+ .withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks.").create();
- Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt)
- .withOption(modelOpt).withOption(maxIterOpt).withOption(helpOpt).withOption(numRedOpt).create();
+ Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt).withOption(modelOpt)
+ .withOption(maxIterOpt).withOption(helpOpt).withOption(numRedOpt).create();
try {
Parser parser = new Parser();
@@ -125,16 +120,15 @@ public final class CDbwDriver {
* the number of iterations
* @param numReducers
* the number of Reducers desired
+ * @throws InterruptedException
*/
public static void runJob(Path clustersIn,
Path clusteredPointsIn,
Path output,
String distanceMeasureClass,
int numIterations,
- int numReducers) throws ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- IOException {
+ int numReducers) throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ IOException, InterruptedException {
Path stateIn = new Path(output, "representativePoints-0");
writeInitialState(stateIn, clustersIn);
@@ -148,30 +142,29 @@ public final class CDbwDriver {
stateIn = stateOut;
}
- Configurable client = new JobClient();
- JobConf conf = new JobConf(CDbwDriver.class);
+ Configuration conf = new Configuration();
conf.set(STATE_IN_KEY, stateIn.toString());
conf.set(DISTANCE_MEASURE_KEY, distanceMeasureClass);
CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
- //System.out.println("CDbw = " + evaluator.CDbw());
+ // now print out the CDbw
+ System.out.println("CDbw = " + evaluator.CDbw());
}
- private static void writeInitialState(Path output, Path clustersIn)
- throws InstantiationException, IllegalAccessException, IOException, SecurityException {
-
- JobConf job = new JobConf(KMeansDriver.class);
- FileSystem fs = FileSystem.get(output.toUri(), job);
+ private static void writeInitialState(Path output, Path clustersIn) throws InstantiationException, IllegalAccessException,
+ IOException, SecurityException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(output.toUri(), conf);
for (FileStatus part : fs.listStatus(clustersIn)) {
if (!part.getPath().getName().startsWith(".")) {
Path inPart = part.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, job);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, conf);
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
Path path = new Path(output, inPart.getName());
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, IntWritable.class, VectorWritable.class);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
while (reader.next(key, value)) {
Cluster cluster = (Cluster) value;
- if (!(cluster instanceof DirichletCluster) || ((DirichletCluster) cluster).getTotalCount() > 0) {
+ if (!(cluster instanceof DirichletCluster<?>) || ((DirichletCluster<?>) cluster).getTotalCount() > 0) {
//System.out.println("C-" + cluster.getId() + ": " + ClusterBase.formatVector(cluster.getCenter(), null));
writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter()));
}
@@ -194,33 +187,29 @@ public final class CDbwDriver {
* the class name of the DistanceMeasure class
* @param numReducers
* the number of Reducers desired
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
- public static void runIteration(Path input, Path stateIn, Path stateOut,
- String distanceMeasureClass, int numReducers) {
- Configurable client = new JobClient();
- JobConf conf = new JobConf(CDbwDriver.class);
-
- conf.setOutputKeyClass(IntWritable.class);
- conf.setOutputValueClass(VectorWritable.class);
- conf.setMapOutputKeyClass(IntWritable.class);
- conf.setMapOutputValueClass(WeightedVectorWritable.class);
-
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, stateOut);
-
- conf.setMapperClass(CDbwMapper.class);
- conf.setReducerClass(CDbwReducer.class);
- conf.setNumReduceTasks(numReducers);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ public static void runIteration(Path input, Path stateIn, Path stateOut, String distanceMeasureClass, int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
conf.set(STATE_IN_KEY, stateIn.toString());
conf.set(DISTANCE_MEASURE_KEY, distanceMeasureClass);
+ Job job = new Job(conf);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(WeightedVectorWritable.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, stateOut);
+
+ job.setMapperClass(CDbwMapper.class);
+ job.setReducerClass(CDbwReducer.class);
+ job.setNumReduceTasks(numReducers);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
- client.setConf(conf);
- try {
- JobClient.runJob(conf);
- } catch (IOException e) {
- log.warn(e.toString(), e);
- }
+ job.waitForCompletion(true);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwEvaluator.java Wed Jul 7 18:46:19 2010
@@ -22,12 +22,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.math.Vector;
@@ -68,18 +68,18 @@ public class CDbwEvaluator {
/**
* Initialize a new instance from job information
*
- * @param job
+ * @param conf
* a JobConf with appropriate parameters
* @param clustersIn
* a String path to the input clusters directory
*/
- public CDbwEvaluator(JobConf job, Path clustersIn)
+ public CDbwEvaluator(Configuration conf, Path clustersIn)
throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- Class<?> cl = ccl.loadClass(job.get(CDbwDriver.DISTANCE_MEASURE_KEY));
+ Class<?> cl = ccl.loadClass(conf.get(CDbwDriver.DISTANCE_MEASURE_KEY));
measure = (DistanceMeasure) cl.newInstance();
- representativePoints = CDbwMapper.getRepresentativePoints(job);
- clusters = loadClusters(job, clustersIn);
+ representativePoints = CDbwMapper.getRepresentativePoints(conf);
+ clusters = loadClusters(conf, clustersIn);
for (Integer cId : representativePoints.keySet()) {
setStDev(cId);
}
@@ -96,14 +96,14 @@ public class CDbwEvaluator {
* a String pathname to the directory containing input cluster files
* @return a List<Cluster> of the clusters
*/
- private static Map<Integer, Cluster> loadClusters(JobConf job, Path clustersIn)
+ private static Map<Integer, Cluster> loadClusters(Configuration conf, Path clustersIn)
throws InstantiationException, IllegalAccessException, IOException {
Map<Integer, Cluster> clusters = new HashMap<Integer, Cluster>();
- FileSystem fs = clustersIn.getFileSystem(job);
+ FileSystem fs = clustersIn.getFileSystem(conf);
for (FileStatus part : fs.listStatus(clustersIn)) {
if (!part.getPath().getName().startsWith(".")) {
Path inPart = part.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, job);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, conf);
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java Wed Jul 7 18:46:19 2010
@@ -23,24 +23,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputLogFilter;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.VectorWritable;
-public class CDbwMapper extends MapReduceBase implements
- Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
+public class CDbwMapper extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
private Map<Integer, List<VectorWritable>> representativePoints;
@@ -48,16 +44,22 @@ public class CDbwMapper extends MapReduc
private DistanceMeasure measure = new EuclideanDistanceMeasure();
- private OutputCollector<IntWritable, WeightedVectorWritable> output;
-
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
+ */
@Override
- public void map(IntWritable clusterId,
- WeightedVectorWritable point,
- OutputCollector<IntWritable, WeightedVectorWritable> output,
- Reporter reporter) throws IOException {
-
- this.output = output;
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (Integer clusterId : mostDistantPoints.keySet()) {
+ context.write(new IntWritable(clusterId), mostDistantPoints.get(clusterId));
+ }
+ super.cleanup(context);
+ }
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+ @Override
+ protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context) throws IOException, InterruptedException {
int key = clusterId.get();
WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
@@ -67,8 +69,34 @@ public class CDbwMapper extends MapReduc
totalDistance += measure.distance(refPoint.get(), point.getVector().get());
}
if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
- mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance,
- new VectorWritable(point.getVector().get().clone())));
+ mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, new VectorWritable(point.getVector().get().clone())));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ try {
+ ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+ Class<?> cl = ccl.loadClass(conf.get(CDbwDriver.DISTANCE_MEASURE_KEY));
+ measure = (DistanceMeasure) cl.newInstance();
+ representativePoints = getRepresentativePoints(conf);
+ } catch (NumberFormatException e) {
+ throw new IllegalStateException(e);
+ } catch (SecurityException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalStateException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
}
}
@@ -77,15 +105,15 @@ public class CDbwMapper extends MapReduc
this.measure = measure;
}
- public static Map<Integer, List<VectorWritable>> getRepresentativePoints(JobConf job) {
- String statePath = job.get(CDbwDriver.STATE_IN_KEY);
+ public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
+ String statePath = conf.get(CDbwDriver.STATE_IN_KEY);
Map<Integer, List<VectorWritable>> representativePoints = new HashMap<Integer, List<VectorWritable>>();
try {
Path path = new Path(statePath);
- FileSystem fs = FileSystem.get(path.toUri(), job);
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
FileStatus[] status = fs.listStatus(path, new OutputLogFilter());
for (FileStatus s : status) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), job);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
try {
IntWritable key = new IntWritable(0);
VectorWritable point = new VectorWritable();
@@ -107,35 +135,4 @@ public class CDbwMapper extends MapReduc
throw new IllegalStateException(e);
}
}
-
- @Override
- public void configure(JobConf job) {
- super.configure(job);
- try {
- ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- Class<?> cl = ccl.loadClass(job.get(CDbwDriver.DISTANCE_MEASURE_KEY));
- measure = (DistanceMeasure) cl.newInstance();
- representativePoints = getRepresentativePoints(job);
- } catch (NumberFormatException e) {
- throw new IllegalStateException(e);
- } catch (SecurityException e) {
- throw new IllegalStateException(e);
- } catch (IllegalArgumentException e) {
- throw new IllegalStateException(e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(e);
- } catch (InstantiationException e) {
- throw new IllegalStateException(e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void close() throws IOException {
- for (Integer clusterId : mostDistantPoints.keySet()) {
- output.collect(new IntWritable(clusterId), mostDistantPoints.get(clusterId));
- }
- super.close();
- }
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java Wed Jul 7 18:46:19 2010
@@ -22,61 +22,56 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.math.VectorWritable;
-public class CDbwReducer extends MapReduceBase
- implements Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+public class CDbwReducer extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
private Map<Integer, List<VectorWritable>> referencePoints;
- private OutputCollector<IntWritable, VectorWritable> output;
-
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(IntWritable key,
- Iterator<WeightedVectorWritable> values,
- OutputCollector<IntWritable, VectorWritable> output,
- Reporter reporter) throws IOException {
- this.output = output;
- // find the most distant point
- WeightedVectorWritable mdp = null;
- while (values.hasNext()) {
- WeightedVectorWritable dpw = values.next();
- if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
- mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (Integer clusterId : referencePoints.keySet()) {
+ for (VectorWritable vw : referencePoints.get(clusterId)) {
+ context.write(new IntWritable(clusterId), vw);
}
}
- output.collect(new IntWritable(key.get()), mdp.getVector());
- }
-
- public void configure(Map<Integer, List<VectorWritable>> referencePoints) {
- this.referencePoints = referencePoints;
+ super.cleanup(context);
}
/* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
- public void close() throws IOException {
- for (Integer clusterId : referencePoints.keySet()) {
- for (VectorWritable vw : referencePoints.get(clusterId)) {
- output.collect(new IntWritable(clusterId), vw);
+ protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context) throws IOException,
+ InterruptedException {
+ // find the most distant point
+ WeightedVectorWritable mdp = null;
+ Iterator<WeightedVectorWritable> it = values.iterator();
+ while (it.hasNext()) {
+ WeightedVectorWritable dpw = it.next();
+ if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
+ mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
}
}
- super.close();
+ context.write(new IntWritable(key.get()), mdp.getVector());
}
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
try {
- referencePoints = CDbwMapper.getRepresentativePoints(job);
+ referencePoints = CDbwMapper.getRepresentativePoints(conf);
} catch (NumberFormatException e) {
throw new IllegalStateException(e);
} catch (SecurityException e) {
@@ -86,4 +81,8 @@ public class CDbwReducer extends MapRedu
}
}
+ public void configure(Map<Integer, List<VectorWritable>> referencePoints) {
+ this.referencePoints = referencePoints;
+ }
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java Wed Jul 7 18:46:19 2010
@@ -31,13 +31,11 @@ import org.apache.commons.cli2.builder.D
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,9 +81,7 @@ public final class SequenceFileDumper {
if (cmdLine.hasOption(seqOpt)) {
Path path = new Path(cmdLine.getValue(seqOpt).toString());
- JobClient client = new JobClient();
- JobConf conf = new JobConf(Job.class);
- client.setConf(conf);
+ Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(path.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java Wed Jul 7 18:46:19 2010
@@ -40,6 +40,7 @@ import org.apache.commons.cli2.builder.D
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,9 +48,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.ClusterBase;
import org.apache.mahout.clustering.WeightedVectorWritable;
@@ -90,7 +88,7 @@ public final class ClusterDumper {
private void init() throws IOException {
if (this.pointsDir != null) {
- JobConf conf = new JobConf(Job.class);
+ Configuration conf = new Configuration();
// read in the points
clusterIdToPoints = readPoints(this.pointsDir, conf);
} else {
@@ -99,9 +97,7 @@ public final class ClusterDumper {
}
public void printClusters(String[] dictionary) throws IOException, InstantiationException, IllegalAccessException {
- JobClient client = new JobClient();
- JobConf conf = new JobConf(Job.class);
- client.setConf(conf);
+ Configuration conf = new Configuration();
if (this.termDictionary != null) {
if (dictionaryFormat.equals("text")) {
@@ -310,7 +306,7 @@ public final class ClusterDumper {
this.useJSON = json;
}
- private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, JobConf conf)
+ private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)
throws IOException {
Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocCombiner.java Wed Jul 7 18:46:19 2010
@@ -20,32 +20,30 @@ package org.apache.mahout.utils.nlp.coll
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
/** Combiner for pass1 of the CollocationDriver. Combines frequencies for values for the same key */
-public class CollocCombiner extends MapReduceBase implements Reducer<GramKey,Gram,GramKey,Gram> {
-
+public class CollocCombiner extends Reducer<GramKey, Gram, GramKey, Gram> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(GramKey key,
- Iterator<Gram> values,
- OutputCollector<GramKey,Gram> output,
- Reporter reporter) throws IOException {
+ protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
int freq = 0;
Gram value = null;
-
+
// accumulate frequencies from values.
- while (values.hasNext()) {
- value = values.next();
+ Iterator<Gram> it = values.iterator();
+ while (it.hasNext()) {
+ value = it.next();
freq += value.getFrequency();
}
value.setFrequency(freq);
-
- output.collect(key, value);
+
+ context.write(key, value);
}
-
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocDriver.java Wed Jul 7 18:46:19 2010
@@ -24,14 +24,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.mahout.common.AbstractJob;
@@ -45,15 +43,19 @@ import org.slf4j.LoggerFactory;
/** Driver for LLR Collocation discovery mapreduce job */
public final class CollocDriver extends AbstractJob {
public static final String DEFAULT_OUTPUT_DIRECTORY = "output";
+
public static final String SUBGRAM_OUTPUT_DIRECTORY = "subgrams";
+
public static final String NGRAM_OUTPUT_DIRECTORY = "ngrams";
-
+
public static final String EMIT_UNIGRAMS = "emit-unigrams";
+
public static final boolean DEFAULT_EMIT_UNIGRAMS = false;
-
+
public static final int DEFAULT_MAX_NGRAM_SIZE = 2;
+
public static final int DEFAULT_PASS1_NUM_REDUCE_TASKS = 1;
-
+
private static final Logger log = LoggerFactory.getLogger(CollocDriver.class);
private CollocDriver() {
@@ -68,36 +70,31 @@ public final class CollocDriver extends
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.numReducersOption().create());
-
- addOption("maxNGramSize", "ng",
- "(Optional) The max size of ngrams to create (2 = bigrams, 3 = trigrams, etc) default: 2",
- String.valueOf(DEFAULT_MAX_NGRAM_SIZE));
- addOption("minSupport", "s",
- "(Optional) Minimum Support. Default Value: " + CollocReducer.DEFAULT_MIN_SUPPORT,
- String.valueOf(CollocReducer.DEFAULT_MIN_SUPPORT));
- addOption("minLLR", "ml",
- "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR,
- String.valueOf(LLRReducer.DEFAULT_MIN_LLR));
+
+ addOption("maxNGramSize",
+ "ng",
+ "(Optional) The max size of ngrams to create (2 = bigrams, 3 = trigrams, etc) default: 2",
+ String.valueOf(DEFAULT_MAX_NGRAM_SIZE));
+ addOption("minSupport", "s", "(Optional) Minimum Support. Default Value: " + CollocReducer.DEFAULT_MIN_SUPPORT, String
+ .valueOf(CollocReducer.DEFAULT_MIN_SUPPORT));
+ addOption("minLLR", "ml", "(Optional)The minimum Log Likelihood Ratio(Float) Default is " + LLRReducer.DEFAULT_MIN_LLR, String
+ .valueOf(LLRReducer.DEFAULT_MIN_LLR));
addOption(DefaultOptionCreator.overwriteOption().create());
- addOption("analyzerName", "a",
- "The class name of the analyzer to use for preprocessing", null);
-
- addFlag("preprocess", "p",
- "If set, input is SequenceFile<Text,Text> where the value is the document, "
+ addOption("analyzerName", "a", "The class name of the analyzer to use for preprocessing", null);
+
+ addFlag("preprocess", "p", "If set, input is SequenceFile<Text,Text> where the value is the document, "
+ " which will be tokenized using the specified analyzer.");
- addFlag("unigram", "u",
- "If set, unigrams will be emitted in the final output alongside collocations");
-
+ addFlag("unigram", "u", "If set, unigrams will be emitted in the final output alongside collocations");
+
Map<String, String> argMap = parseArguments(args);
-
+
if (argMap == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
-
-
+
int maxNGramSize = DEFAULT_MAX_NGRAM_SIZE;
if (argMap.get("--maxNGramSize") != null) {
try {
@@ -107,39 +104,34 @@ public final class CollocDriver extends
}
}
log.info("Maximum n-gram size is: {}", maxNGramSize);
-
-
+
if (argMap.containsKey("--overwrite")) {
HadoopUtil.overwriteOutput(output);
}
-
-
+
int minSupport = CollocReducer.DEFAULT_MIN_SUPPORT;
if (argMap.get("--minsupport") != null) {
minSupport = Integer.parseInt(argMap.get("--minsupport"));
}
log.info("Minimum Support value: {}", minSupport);
-
-
+
float minLLRValue = LLRReducer.DEFAULT_MIN_LLR;
if (argMap.get("--minLLR") != null) {
minLLRValue = Float.parseFloat(argMap.get("--minLLR"));
}
log.info("Minimum LLR value: {}", minLLRValue);
-
-
+
int reduceTasks = DEFAULT_PASS1_NUM_REDUCE_TASKS;
if (argMap.get("--maxRed") != null) {
reduceTasks = Integer.parseInt(argMap.get("--maxRed"));
}
log.info("Number of pass1 reduce tasks: {}", reduceTasks);
-
-
+
boolean emitUnigrams = argMap.containsKey("--emitUnigrams");
if (argMap.containsKey("--preprocess")) {
log.info("Input will be preprocessed");
-
+
Class<? extends Analyzer> analyzerClass = DefaultAnalyzer.class;
if (argMap.get("--analyzerName") != null) {
String className = argMap.get("--analyzerName");
@@ -148,25 +140,24 @@ public final class CollocDriver extends
// you can't instantiate it
analyzerClass.newInstance();
}
-
+
Path tokenizedPath = new Path(output, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
-
+
DocumentProcessor.tokenizeDocuments(input, analyzerClass, tokenizedPath);
input = tokenizedPath;
} else {
log.info("Input will NOT be preprocessed");
}
-
+
// parse input and extract collocations
- long ngramCount = generateCollocations(input, output, getConf(), emitUnigrams, maxNGramSize,
- reduceTasks, minSupport);
-
+ long ngramCount = generateCollocations(input, output, getConf(), emitUnigrams, maxNGramSize, reduceTasks, minSupport);
+
// tally collocations and perform LLR calculation
computeNGramsPruneByLLR(output, getConf(), ngramCount, emitUnigrams, minLLRValue, reduceTasks);
return 0;
}
-
+
/**
* Generate all ngrams for the {@link org.apache.mahout.utils.vectors.text.DictionaryVectorizer} job
*
@@ -183,6 +174,8 @@ public final class CollocDriver extends
* @param reduceTasks
* number of reducers used
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void generateAllGrams(Path input,
Path output,
@@ -190,15 +183,14 @@ public final class CollocDriver extends
int maxNGramSize,
int minSupport,
float minLLRValue,
- int reduceTasks) throws IOException {
+ int reduceTasks) throws IOException, InterruptedException, ClassNotFoundException {
// parse input and extract collocations
- long ngramCount = generateCollocations(input, output, baseConf, true, maxNGramSize, reduceTasks,
- minSupport);
-
+ long ngramCount = generateCollocations(input, output, baseConf, true, maxNGramSize, reduceTasks, minSupport);
+
// tally collocations and perform LLR calculation
computeNGramsPruneByLLR(output, baseConf, ngramCount, true, minLLRValue, reduceTasks);
}
-
+
/**
* pass1: generate collocations, ngrams
*/
@@ -209,73 +201,74 @@ public final class CollocDriver extends
int maxNGramSize,
int reduceTasks,
int minSupport) throws IOException {
-
- JobConf conf = new JobConf(baseConf, CollocDriver.class);
- conf.setJobName(CollocDriver.class.getSimpleName() + ".generateCollocations:" + input);
-
- conf.setMapOutputKeyClass(GramKey.class);
- conf.setMapOutputValueClass(Gram.class);
- conf.setPartitionerClass(GramKeyPartitioner.class);
- conf.setOutputValueGroupingComparator(GramKeyGroupComparator.class);
-
- conf.setOutputKeyClass(Gram.class);
- conf.setOutputValueClass(Gram.class);
-
- conf.setCombinerClass(CollocCombiner.class);
-
- conf.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
-
- FileInputFormat.setInputPaths(conf, input);
-
+
+ Configuration con = new Configuration();
+ con.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
+ con.setInt(CollocMapper.MAX_SHINGLE_SIZE, maxNGramSize);
+ con.setInt(CollocReducer.MIN_SUPPORT, minSupport);
+ Job job = new Job(con);
+ job.setJobName(CollocDriver.class.getSimpleName() + ".generateCollocations:" + input);
+
+ job.setMapOutputKeyClass(GramKey.class);
+ job.setMapOutputValueClass(Gram.class);
+ job.setPartitionerClass(GramKeyPartitioner.class);
+ job.setGroupingComparatorClass(GramKeyGroupComparator.class);
+
+ job.setOutputKeyClass(Gram.class);
+ job.setOutputValueClass(Gram.class);
+
+ job.setCombinerClass(CollocCombiner.class);
+
+ FileInputFormat.setInputPaths(job, input);
+
Path outputPath = new Path(output, SUBGRAM_OUTPUT_DIRECTORY);
- FileOutputFormat.setOutputPath(conf, outputPath);
-
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setMapperClass(CollocMapper.class);
-
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setReducerClass(CollocReducer.class);
- conf.setInt(CollocMapper.MAX_SHINGLE_SIZE, maxNGramSize);
- conf.setInt(CollocReducer.MIN_SUPPORT, minSupport);
- conf.setNumReduceTasks(reduceTasks);
-
- RunningJob job = JobClient.runJob(conf);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(CollocMapper.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setReducerClass(CollocReducer.class);
+ job.setNumReduceTasks(reduceTasks);
+
return job.getCounters().findCounter(CollocMapper.Count.NGRAM_TOTAL).getValue();
}
-
+
/**
* pass2: perform the LLR calculation
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void computeNGramsPruneByLLR(Path output,
- Configuration baseConf,
- long nGramTotal,
- boolean emitUnigrams,
- float minLLRValue,
- int reduceTasks) throws IOException {
- JobConf conf = new JobConf(baseConf, CollocDriver.class);
- conf.setJobName(CollocDriver.class.getSimpleName() + ".computeNGrams: " + output);
-
-
+ Configuration baseConf,
+ long nGramTotal,
+ boolean emitUnigrams,
+ float minLLRValue,
+ int reduceTasks) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
conf.setLong(LLRReducer.NGRAM_TOTAL, nGramTotal);
conf.setBoolean(EMIT_UNIGRAMS, emitUnigrams);
-
- conf.setMapOutputKeyClass(Gram.class);
- conf.setMapOutputValueClass(Gram.class);
-
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(DoubleWritable.class);
-
- FileInputFormat.setInputPaths(conf, new Path(output, SUBGRAM_OUTPUT_DIRECTORY));
+
+ Job job = new Job(conf);
+ job.setJobName(CollocDriver.class.getSimpleName() + ".computeNGrams: " + output);
+
+ job.setMapOutputKeyClass(Gram.class);
+ job.setMapOutputValueClass(Gram.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(DoubleWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(output, SUBGRAM_OUTPUT_DIRECTORY));
Path outPath = new Path(output, NGRAM_OUTPUT_DIRECTORY);
- FileOutputFormat.setOutputPath(conf, outPath);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setReducerClass(LLRReducer.class);
- conf.setNumReduceTasks(reduceTasks);
-
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setReducerClass(LLRReducer.class);
+ job.setNumReduceTasks(reduceTasks);
+
conf.setFloat(LLRReducer.MIN_LLR, minLLRValue);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocMapper.java Wed Jul 7 18:46:19 2010
@@ -20,12 +20,9 @@ package org.apache.mahout.utils.nlp.coll
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.apache.lucene.analysis.tokenattributes.TermAttribute;
@@ -41,36 +38,24 @@ import org.slf4j.LoggerFactory;
* Input is a SequeceFile<Text,StringTuple>, where the key is a document id and the value is the tokenized documents.
* <p/>
*/
-public class CollocMapper extends MapReduceBase implements Mapper<Text,StringTuple,GramKey,Gram> {
+public class CollocMapper extends Mapper<Text, StringTuple, GramKey, Gram> {
private static final byte[] EMPTY = new byte[0];
-
+
public static final String MAX_SHINGLE_SIZE = "maxShingleSize";
+
public static final int DEFAULT_MAX_SHINGLE_SIZE = 2;
-
+
public enum Count {
NGRAM_TOTAL
}
-
+
private static final Logger log = LoggerFactory.getLogger(CollocMapper.class);
-
+
private int maxShingleSize;
+
private boolean emitUnigrams;
-
- @Override
- public void configure(JobConf job) {
- super.configure(job);
-
- this.maxShingleSize = job.getInt(MAX_SHINGLE_SIZE, DEFAULT_MAX_SHINGLE_SIZE);
-
- this.emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-
- if (log.isInfoEnabled()) {
- log.info("Max Ngram size is {}", this.maxShingleSize);
- log.info("Emit Unitgrams is {}", emitUnigrams);
- }
- }
-
+
/**
* Collocation finder: pass 1 map phase.
* <p/>
@@ -110,16 +95,14 @@ public class CollocMapper extends MapRed
* if there's a problem with the ShingleFilter reading data or the collector collecting output.
*/
@Override
- public void map(Text key, StringTuple value,
- final OutputCollector<GramKey,Gram> collector, Reporter reporter) throws IOException {
-
+ protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
+
ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(value.getEntries().iterator()), maxShingleSize);
int count = 0; // ngram count
-
- OpenObjectIntHashMap<String> ngrams = new OpenObjectIntHashMap<String>(value.getEntries().size()
- * (maxShingleSize - 1));
+
+ OpenObjectIntHashMap<String> ngrams = new OpenObjectIntHashMap<String>(value.getEntries().size() * (maxShingleSize - 1));
OpenObjectIntHashMap<String> unigrams = new OpenObjectIntHashMap<String>(value.getEntries().size());
-
+
do {
String term = ((TermAttribute) sf.getAttribute(TermAttribute.class)).term();
String type = ((TypeAttribute) sf.getAttribute(TypeAttribute.class)).type();
@@ -130,51 +113,55 @@ public class CollocMapper extends MapRed
unigrams.adjustOrPutValue(term, 1, 1);
}
} while (sf.incrementToken());
-
+
try {
final GramKey gramKey = new GramKey();
-
+
ngrams.forEachPair(new ObjectIntProcedure<String>() {
@Override
public boolean apply(String term, int frequency) {
// obtain components, the leading (n-1)gram and the trailing unigram.
int i = term.lastIndexOf(' '); // TODO: fix for non-whitespace delimited languages.
if (i != -1) { // bigram, trigram etc
-
+
try {
Gram ngram = new Gram(term, frequency, Gram.Type.NGRAM);
- Gram head = new Gram(term.substring(0, i), frequency, Gram.Type.HEAD);
- Gram tail = new Gram(term.substring(i + 1), frequency, Gram.Type.TAIL);
-
+ Gram head = new Gram(term.substring(0, i), frequency, Gram.Type.HEAD);
+ Gram tail = new Gram(term.substring(i + 1), frequency, Gram.Type.TAIL);
+
gramKey.set(head, EMPTY);
- collector.collect(gramKey, head);
-
+ context.write(gramKey, head);
+
gramKey.set(head, ngram.getBytes());
- collector.collect(gramKey, ngram);
-
+ context.write(gramKey, ngram);
+
gramKey.set(tail, EMPTY);
- collector.collect(gramKey, tail);
-
+ context.write(gramKey, tail);
+
gramKey.set(tail, ngram.getBytes());
- collector.collect(gramKey, ngram);
-
+ context.write(gramKey, ngram);
+
} catch (IOException e) {
throw new IllegalStateException(e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
}
return true;
}
});
-
+
unigrams.forEachPair(new ObjectIntProcedure<String>() {
@Override
public boolean apply(String term, int frequency) {
try {
Gram unigram = new Gram(term, frequency, Gram.Type.UNIGRAM);
gramKey.set(unigram, EMPTY);
- collector.collect(gramKey, unigram);
+ context.write(gramKey, unigram);
} catch (IOException e) {
throw new IllegalStateException(e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
return true;
}
@@ -188,23 +175,41 @@ public class CollocMapper extends MapRed
throw ise;
}
}
-
- reporter.incrCounter(Count.NGRAM_TOTAL, count);
-
+
+ context.getCounter(Count.NGRAM_TOTAL).increment(count);
+
sf.end();
sf.close();
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.maxShingleSize = conf.getInt(MAX_SHINGLE_SIZE, DEFAULT_MAX_SHINGLE_SIZE);
+
+ this.emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+ if (log.isInfoEnabled()) {
+ log.info("Max Ngram size is {}", this.maxShingleSize);
+ log.info("Emit Unitgrams is {}", emitUnigrams);
+ }
+ }
+
/** Used to emit tokens from an input string array in the style of TokenStream */
public static class IteratorTokenStream extends TokenStream {
private final TermAttribute termAtt;
+
private final Iterator<String> iterator;
-
+
public IteratorTokenStream(Iterator<String> iterator) {
this.iterator = iterator;
this.termAtt = (TermAttribute) addAttribute(TermAttribute.class);
}
-
+
@Override
public boolean incrementToken() throws IOException {
if (iterator.hasNext()) {
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/CollocReducer.java Wed Jul 7 18:46:19 2010
@@ -20,47 +20,28 @@ package org.apache.mahout.utils.nlp.coll
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reducer for Pass 1 of the collocation identification job. Generates counts for ngrams and subgrams.
*/
-public class CollocReducer extends MapReduceBase implements Reducer<GramKey,Gram,Gram,Gram> {
+public class CollocReducer extends Reducer<GramKey, Gram, Gram, Gram> {
private static final Logger log = LoggerFactory.getLogger(CollocReducer.class);
public static final String MIN_SUPPORT = "minSupport";
+
public static final int DEFAULT_MIN_SUPPORT = 2;
-
+
public enum Skipped {
- LESS_THAN_MIN_SUPPORT,
- MALFORMED_KEY_TUPLE,
- MALFORMED_TUPLE,
- MALFORMED_TYPES,
- MALFORMED_UNIGRAM
+ LESS_THAN_MIN_SUPPORT, MALFORMED_KEY_TUPLE, MALFORMED_TUPLE, MALFORMED_TYPES, MALFORMED_UNIGRAM
}
private int minSupport;
- @Override
- public void configure(JobConf job) {
- super.configure(job);
-
- this.minSupport = job.getInt(MIN_SUPPORT, DEFAULT_MIN_SUPPORT);
-
- boolean emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-
- log.info("Min support is {}", minSupport);
- log.info("Emit Unitgrams is {}", emitUnigrams);
-
- }
-
/**
* collocation finder: pass 1 reduce phase:
* <p/>
@@ -86,33 +67,45 @@ public class CollocReducer extends MapRe
* head and move the count into the value?
*/
@Override
- public void reduce(GramKey key,
- Iterator<Gram> values,
- OutputCollector<Gram,Gram> output,
- Reporter reporter) throws IOException {
-
+ protected void reduce(GramKey key, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
Gram.Type keyType = key.getType();
if (keyType == Gram.Type.UNIGRAM) {
// sum frequencies for unigrams.
- processUnigram(key, values, output, reporter);
+ processUnigram(key, values.iterator(), context);
} else if (keyType == Gram.Type.HEAD || keyType == Gram.Type.TAIL) {
// sum frequencies for subgrams, ngram and collect for each ngram.
- processSubgram(key, values, output, reporter);
+ processSubgram(key, values.iterator(), context);
} else {
- reporter.incrCounter(Skipped.MALFORMED_TYPES, 1);
+ context.getCounter(Skipped.MALFORMED_TYPES).increment(1);
}
}
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.minSupport = conf.getInt(MIN_SUPPORT, DEFAULT_MIN_SUPPORT);
+
+ boolean emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+ log.info("Min support is {}", minSupport);
+ log.info("Emit Unitgrams is {}", emitUnigrams);
+ }
+
/**
* Sum frequencies for unigrams and deliver to the collector
+ * @throws InterruptedException
*/
- protected void processUnigram(GramKey key, Iterator<Gram> values,
- OutputCollector<Gram, Gram> output, Reporter reporter) throws IOException {
+ protected void processUnigram(GramKey key, Iterator<Gram> values, Context context) throws IOException, InterruptedException {
int freq = 0;
Gram value = null;
-
+
// accumulate frequencies from values.
while (values.hasNext()) {
value = values.next();
@@ -120,15 +113,15 @@ public class CollocReducer extends MapRe
}
if (freq < minSupport) {
- reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
return;
}
value.setFrequency(freq);
- output.collect(value, value);
+ context.write(value, value);
}
-
+
/** Sum frequencies for subgram, ngrams and deliver ngram, subgram pairs to the collector.
* <p/>
* Sort order guarantees that the subgram/subgram pairs will be seen first and then
@@ -137,17 +130,17 @@ public class CollocReducer extends MapRe
* <p/>
* We end up calculating frequencies for ngrams for each sugram (head, tail) here, which is
* some extra work.
+ * @throws InterruptedException
*/
- protected void processSubgram(GramKey key, Iterator<Gram> values,
- OutputCollector<Gram,Gram> output, Reporter reporter) throws IOException {
+ protected void processSubgram(GramKey key, Iterator<Gram> values, Context context) throws IOException, InterruptedException {
- Gram subgram = null;
+ Gram subgram = null;
Gram currentNgram = null;
-
+
while (values.hasNext()) {
Gram value = values.next();
- if (value.getType() == Gram.Type.HEAD || value.getType() == Gram.Type.TAIL) {
+ if (value.getType() == Gram.Type.HEAD || value.getType() == Gram.Type.TAIL) {
// collect frequency for subgrams.
if (subgram == null) {
subgram = new Gram(value);
@@ -160,9 +153,9 @@ public class CollocReducer extends MapRe
// create the new ngram.
if (currentNgram != null) {
if (currentNgram.getFrequency() < minSupport) {
- reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
} else {
- output.collect(currentNgram, subgram);
+ context.write(currentNgram, subgram);
}
}
@@ -171,15 +164,15 @@ public class CollocReducer extends MapRe
currentNgram.incrementFrequency(value.getFrequency());
}
}
-
+
// collect last ngram.
if (currentNgram != null) {
if (currentNgram.getFrequency() < minSupport) {
- reporter.incrCounter(Skipped.LESS_THAN_MIN_SUPPORT, 1);
+ context.getCounter(Skipped.LESS_THAN_MIN_SUPPORT).increment(1);
return;
}
-
- output.collect(currentNgram, subgram);
+
+ context.write(currentNgram, subgram);
}
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKeyPartitioner.java Wed Jul 7 18:46:19 2010
@@ -19,14 +19,13 @@ package org.apache.mahout.utils.nlp.coll
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
/**
* Partition GramKeys based on their Gram, ignoring the secondary sort key so that all GramKeys with the same
* gram are sent to the same partition.
*/
-public class GramKeyPartitioner implements Partitioner<GramKey, Gram> {
+public class GramKeyPartitioner extends Partitioner<GramKey, Gram> {
private static final String HASH_OFFSET_PROPERTY_NAME = "grampartitioner.hash.offset";
@@ -45,8 +44,7 @@ public class GramKeyPartitioner implemen
return (hash & Integer.MAX_VALUE) % numPartitions;
}
- @Override
- public void configure(JobConf conf) {
+ public void configure(Configuration conf) {
offset = conf.getInt(HASH_OFFSET_PROPERTY_NAME, -1);
}
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/LLRReducer.java Wed Jul 7 18:46:19 2010
@@ -20,13 +20,10 @@ package org.apache.mahout.utils.nlp.coll
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.stats.LogLikelihood;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,64 +32,29 @@ import org.slf4j.LoggerFactory;
* Reducer for pass 2 of the collocation discovery job. Collects ngram and sub-ngram frequencies and performs
* the Log-likelihood ratio calculation.
*/
-public class LLRReducer extends MapReduceBase implements Reducer<Gram,Gram,Text,DoubleWritable> {
-
+public class LLRReducer extends Reducer<Gram, Gram, Text, DoubleWritable> {
+
/** Counter to track why a particlar entry was skipped */
public enum Skipped {
- EXTRA_HEAD,
- EXTRA_TAIL,
- MISSING_HEAD,
- MISSING_TAIL,
- LESS_THAN_MIN_LLR,
- LLR_CALCULATION_ERROR,
+ EXTRA_HEAD, EXTRA_TAIL, MISSING_HEAD, MISSING_TAIL, LESS_THAN_MIN_LLR, LLR_CALCULATION_ERROR,
}
private static final Logger log = LoggerFactory.getLogger(LLRReducer.class);
-
+
public static final String NGRAM_TOTAL = "ngramTotal";
+
public static final String MIN_LLR = "minLLR";
+
public static final float DEFAULT_MIN_LLR = 1.0f;
-
+
private long ngramTotal;
+
private float minLLRValue;
+
private boolean emitUnigrams;
-
+
private final LLCallback ll;
-
- public LLRReducer() {
- this.ll = new ConcreteLLCallback();
- }
-
- /**
- * plug in an alternate LL implementation, used for testing
- *
- * @param ll
- * the LL to use.
- */
- LLRReducer(LLCallback ll) {
- this.ll = ll;
- }
-
- @Override
- public void configure(JobConf job) {
- super.configure(job);
-
- this.ngramTotal = job.getLong(NGRAM_TOTAL, -1);
- this.minLLRValue = job.getFloat(MIN_LLR, DEFAULT_MIN_LLR);
-
- this.emitUnigrams = job.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
-
- if (log.isInfoEnabled()) {
- log.info("NGram Total is {}", ngramTotal);
- log.info("Min LLR value is {}", minLLRValue);
- log.info("Emit Unitgrams is {}", emitUnigrams);
- }
-
- if (ngramTotal == -1) {
- throw new IllegalStateException("No NGRAM_TOTAL available in job config");
- }
- }
-
+
/**
* Perform LLR calculation, input is: k:ngram:ngramFreq v:(h_|t_)subgram:subgramfreq N = ngram total
*
@@ -103,67 +65,65 @@ public class LLRReducer extends MapReduc
* number of times neither A or B appears (in that order): N - (subgramFreqA + subgramFreqB - ngramFreq)
*/
@Override
- public void reduce(Gram ngram,
- Iterator<Gram> values,
- OutputCollector<Text,DoubleWritable> output,
- Reporter reporter) throws IOException {
-
- int[] gramFreq = {-1, -1};
-
+ protected void reduce(Gram ngram, Iterable<Gram> values, Context context) throws IOException, InterruptedException {
+
+ int[] gramFreq = { -1, -1 };
+
if (ngram.getType() == Gram.Type.UNIGRAM && emitUnigrams) {
DoubleWritable dd = new DoubleWritable(ngram.getFrequency());
Text t = new Text(ngram.getString());
- output.collect(t, dd);
+ context.write(t, dd);
return;
}
// FIXME: better way to handle errors? Wouldn't an exception thrown here
// cause hadoop to re-try the job?
String[] gram = new String[2];
- while (values.hasNext()) {
- Gram value = values.next();
-
+ Iterator<Gram> it = values.iterator();
+ while (it.hasNext()) {
+ Gram value = it.next();
+
int pos = value.getType() == Gram.Type.HEAD ? 0 : 1;
-
+
if (gramFreq[pos] != -1) {
log.warn("Extra {} for {}, skipping", value.getType(), ngram);
if (value.getType() == Gram.Type.HEAD) {
- reporter.incrCounter(Skipped.EXTRA_HEAD, 1);
+ context.getCounter(Skipped.EXTRA_HEAD).increment(1);
} else {
- reporter.incrCounter(Skipped.EXTRA_TAIL, 1);
+ context.getCounter(Skipped.EXTRA_TAIL).increment(1);
}
return;
}
-
+
gram[pos] = value.getString();
gramFreq[pos] = value.getFrequency();
}
-
+
if (gramFreq[0] == -1) {
log.warn("Missing head for {}, skipping.", ngram);
- reporter.incrCounter(Skipped.MISSING_HEAD, 1);
+ context.getCounter(Skipped.MISSING_HEAD).increment(1);
return;
} else if (gramFreq[1] == -1) {
log.warn("Missing tail for {}, skipping", ngram);
- reporter.incrCounter(Skipped.MISSING_TAIL, 1);
+ context.getCounter(Skipped.MISSING_TAIL).increment(1);
return;
}
-
+
int k11 = ngram.getFrequency(); /* a&b */
int k12 = gramFreq[0] - ngram.getFrequency(); /* a&!b */
int k21 = gramFreq[1] - ngram.getFrequency(); /* !b&a */
int k22 = (int) (ngramTotal - (gramFreq[0] + gramFreq[1] - ngram.getFrequency())); /* !a&!b */
-
+
try {
double llr = ll.logLikelihoodRatio(k11, k12, k21, k22);
if (llr < minLLRValue) {
- reporter.incrCounter(Skipped.LESS_THAN_MIN_LLR, 1);
+ context.getCounter(Skipped.LESS_THAN_MIN_LLR).increment(1);
return;
}
DoubleWritable dd = new DoubleWritable(llr);
Text t = new Text(ngram.getString());
- output.collect(t, dd);
+ context.write(t, dd);
} catch (IllegalArgumentException ex) {
- reporter.incrCounter(Skipped.LLR_CALCULATION_ERROR, 1);
+ context.getCounter(Skipped.LLR_CALCULATION_ERROR).increment(1);
log.error("Problem calculating LLR ratio: " + ex.getMessage());
log.error("NGram: " + ngram);
log.error("HEAD: " + gram[0] + ':' + gramFreq[0]);
@@ -171,14 +131,51 @@ public class LLRReducer extends MapReduc
log.error("k11: " + k11 + " k12: " + k12 + " k21: " + k21 + " k22: " + k22);
}
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ this.ngramTotal = conf.getLong(NGRAM_TOTAL, -1);
+ this.minLLRValue = conf.getFloat(MIN_LLR, DEFAULT_MIN_LLR);
+
+ this.emitUnigrams = conf.getBoolean(CollocDriver.EMIT_UNIGRAMS, CollocDriver.DEFAULT_EMIT_UNIGRAMS);
+
+ if (log.isInfoEnabled()) {
+ log.info("NGram Total is {}", ngramTotal);
+ log.info("Min LLR value is {}", minLLRValue);
+ log.info("Emit Unitgrams is {}", emitUnigrams);
+ }
+
+ if (ngramTotal == -1) {
+ throw new IllegalStateException("No NGRAM_TOTAL available in job config");
+ }
+ }
+
+ public LLRReducer() {
+ this.ll = new ConcreteLLCallback();
+ }
+
+ /**
+ * plug in an alternate LL implementation, used for testing
+ *
+ * @param ll
+ * the LL to use.
+ */
+ LLRReducer(LLCallback ll) {
+ this.ll = ll;
+ }
+
/**
* provide interface so the input to the llr calculation can be captured for validation in unit testing
*/
public interface LLCallback {
double logLikelihoodRatio(int k11, int k12, int k21, int k22);
}
-
+
/** concrete implementation delegates to LogLikelihood class */
public static final class ConcreteLLCallback implements LLCallback {
@Override
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java Wed Jul 7 18:46:19 2010
@@ -26,12 +26,10 @@ import org.apache.commons.cli2.builder.D
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.utils.vectors.SequenceFileVectorIterable.SeqFileIterator;
@@ -104,9 +102,7 @@ public final class VectorDumper {
if (cmdLine.hasOption(seqOpt)) {
Path path = new Path(cmdLine.getValue(seqOpt).toString());
//System.out.println("Input Path: " + path); interferes with output?
- JobClient client = new JobClient();
- JobConf conf = new JobConf(Job.class);
- client.setConf(conf);
+ Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(path.toUri(), conf);
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java Wed Jul 7 18:46:19 2010
@@ -18,7 +18,6 @@
package org.apache.mahout.utils.vectors.arff;
import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.Map;
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMergeReducer.java Wed Jul 7 18:46:19 2010
@@ -20,12 +20,9 @@ package org.apache.mahout.utils.vectors.
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.NamedVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -35,22 +32,26 @@ import org.apache.mahout.math.VectorWrit
/**
* Merges partial vectors in to a full sparse vector
*/
-public class PartialVectorMergeReducer extends MapReduceBase implements
- Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+public class PartialVectorMergeReducer extends
+ Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
private double normPower;
+
private int dimension;
+
private boolean sequentialAccess;
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#reduce(java.lang.Object, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void reduce(WritableComparable<?> key,
- Iterator<VectorWritable> values,
- OutputCollector<WritableComparable<?>,VectorWritable> output,
- Reporter reporter) throws IOException {
-
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context context) throws IOException,
+ InterruptedException {
+
Vector vector = new RandomAccessSparseVector(dimension, 10);
- while (values.hasNext()) {
- VectorWritable value = values.next();
+ Iterator<VectorWritable> it = values.iterator();
+ while (it.hasNext()) {
+ VectorWritable value = it.next();
value.get().addTo(vector);
}
if (normPower != PartialVectorMerger.NO_NORMALIZING) {
@@ -60,14 +61,19 @@ public class PartialVectorMergeReducer e
vector = new SequentialAccessSparseVector(vector);
}
VectorWritable vectorWritable = new VectorWritable(new NamedVector(vector, key.toString()));
- output.collect(key, vectorWritable);
+ context.write(key, vectorWritable);
}
-
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
+ */
@Override
- public void configure(JobConf job) {
- super.configure(job);
- normPower = job.getFloat(PartialVectorMerger.NORMALIZATION_POWER, PartialVectorMerger.NO_NORMALIZING);
- dimension = job.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
- sequentialAccess = job.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ normPower = conf.getFloat(PartialVectorMerger.NORMALIZATION_POWER, PartialVectorMerger.NO_NORMALIZING);
+ dimension = conf.getInt(PartialVectorMerger.DIMENSION, Integer.MAX_VALUE);
+ sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, false);
}
+
}
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java?rev=961476&r1=961475&r2=961476&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/common/PartialVectorMerger.java Wed Jul 7 18:46:19 2010
@@ -20,16 +20,15 @@ package org.apache.mahout.utils.vectors.
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.VectorWritable;
@@ -41,22 +40,22 @@ import org.apache.mahout.math.VectorWrit
*
*/
public final class PartialVectorMerger {
-
+
public static final float NO_NORMALIZING = -1.0f;
-
+
public static final String NORMALIZATION_POWER = "normalization.power";
-
+
public static final String DIMENSION = "vector.dimension";
-
+
public static final String SEQUENTIAL_ACCESS = "vector.sequentialAccess";
-
+
/**
* Cannot be initialized. Use the static functions
*/
private PartialVectorMerger() {
}
-
+
/**
* Merge all the partial {@link org.apache.mahout.math.RandomAccessSparseVector}s into the complete Document
* {@link org.apache.mahout.math.RandomAccessSparseVector}
@@ -70,46 +69,48 @@ public final class PartialVectorMerger {
* @param numReducers
* The number of reducers to spawn
* @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static void mergePartialVectors(List<Path> partialVectorPaths,
Path output,
float normPower,
int dimension,
- boolean sequentialAccess,
- int numReducers) throws IOException {
+ boolean sequentialAccess,
+ int numReducers) throws IOException, InterruptedException, ClassNotFoundException {
if (normPower != NO_NORMALIZING && normPower < 0) {
throw new IllegalArgumentException("normPower must either be -1 or >= 0");
}
-
- Configurable client = new JobClient();
- JobConf conf = new JobConf(PartialVectorMerger.class);
- conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
- + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ Configuration conf = new Configuration();
// this conf parameter needs to be set enable serialisation of conf values
- conf.setJobName("PartialVectorMerger::MergePartialVectors");
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
conf.setBoolean(SEQUENTIAL_ACCESS, sequentialAccess);
conf.setInt(DIMENSION, dimension);
conf.setFloat(NORMALIZATION_POWER, normPower);
-
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(VectorWritable.class);
-
- FileInputFormat.setInputPaths(conf, getCommaSeparatedPaths(partialVectorPaths));
-
- FileOutputFormat.setOutputPath(conf, output);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setReducerClass(PartialVectorMergeReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setNumReduceTasks(numReducers);
-
+
+ Job job = new Job(conf);
+ job.setJobName("PartialVectorMerger::MergePartialVectors");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(VectorWritable.class);
+
+ FileInputFormat.setInputPaths(job, getCommaSeparatedPaths(partialVectorPaths));
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setMapperClass(Mapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setReducerClass(PartialVectorMergeReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(numReducers);
+
HadoopUtil.overwriteOutput(output);
- client.setConf(conf);
- JobClient.runJob(conf);
+ job.waitForCompletion(true);
}
-
+
private static String getCommaSeparatedPaths(List<Path> paths) {
StringBuilder commaSeparatedPaths = new StringBuilder();
String sep = "";