You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2010/01/06 03:46:23 UTC
svn commit: r896311 [2/4] - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/classifier/
core/src/main/java/org/apache/mahout/classifier/bayes/algorithm/
core/src/main/java/org/apache/mahout/classifier/bayes/common/
core/src/main/java/org/...
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,10 @@
package org.apache.mahout.classifier.bayes.io;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,75 +34,74 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* This Class reads the different interim files created during the Training
* stage as well as the Model File during testing.
*/
-public class SequenceFileModelReader {
-
+public final class SequenceFileModelReader {
+
private static final Logger log = LoggerFactory
.getLogger(SequenceFileModelReader.class);
-
- private SequenceFileModelReader() {
- }
-
- public static void loadModel(InMemoryBayesDatastore datastore, FileSystem fs,
- Parameters params, Configuration conf) throws IOException {
-
+
+ private SequenceFileModelReader() { }
+
+ public static void loadModel(InMemoryBayesDatastore datastore,
+ FileSystem fs,
+ Parameters params,
+ Configuration conf) throws IOException {
+
loadFeatureWeights(datastore, fs, new Path(params.get("sigma_j")), conf);
loadLabelWeights(datastore, fs, new Path(params.get("sigma_k")), conf);
loadSumWeight(datastore, fs, new Path(params.get("sigma_kSigma_j")), conf);
loadThetaNormalizer(datastore, fs, new Path(params.get("thetaNormalizer")),
- conf);
+ conf);
loadWeightMatrix(datastore, fs, new Path(params.get("weight")), conf);
-
+
}
-
+
public static void loadWeightMatrix(InMemoryBayesDatastore datastore,
- FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
-
+ FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
log.info("{}", path);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
+
// the key is label,feature
while (reader.next(key, value)) {
-
+
datastore.loadFeatureWeight(key.stringAt(2), key.stringAt(1), value
.get());
-
+
}
}
}
-
+
public static void loadFeatureWeights(InMemoryBayesDatastore datastore,
- FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
-
+ FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
log.info("{}", path);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
+
// the key is either _label_ or label,feature
long count = 0;
while (reader.next(key, value)) {
-
- if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) { // Sum of
- // weights for
- // a Feature
+ // Sum of weights for a Feature
+ if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) {
datastore.setSumFeatureWeight(key.stringAt(1), value.get());
count++;
if (count % 50000 == 0) {
@@ -108,24 +111,25 @@
}
}
}
-
+
public static void loadLabelWeights(InMemoryBayesDatastore datastore,
- FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
-
+ FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
log.info("{}", path);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
+
long count = 0;
while (reader.next(key, value)) {
- if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) { // Sum of
- // weights in a
- // Label
+ // Sum of weights in a Label
+ if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) {
datastore.setSumLabelWeight(key.stringAt(1), value.get());
count++;
if (count % 10000 == 0) {
@@ -135,27 +139,25 @@
}
}
}
-
+
public static void loadThetaNormalizer(InMemoryBayesDatastore datastore,
- FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
-
+ FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
log.info("{}", path);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
+
long count = 0;
while (reader.next(key, value)) {
- if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) { // Sum
- // of
- // weights
- // in
- // a
- // Label
+ // Sum of weights in a Label
+ if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
datastore.setThetaNormalizer(key.stringAt(1), value.get());
count++;
if (count % 50000 == 0) {
@@ -165,115 +167,118 @@
}
}
}
-
+
public static void loadSumWeight(InMemoryBayesDatastore datastore,
- FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
-
+ FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
log.info("{}", path);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-
+
// the key is _label
while (reader.next(key, value)) {
-
+
if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) { // Sum of
- // weights for
+ // weights for
// all Features and all Labels
- datastore.setSigma_jSigma_k(value.get());
+ datastore.setSigmaJSigmaK(value.get());
log.info("{}", value.get());
}
}
}
}
-
- public static Map<String, Double> readLabelSums(FileSystem fs,
- Path pathPattern, Configuration conf) throws IOException {
- Map<String, Double> labelSum = new HashMap<String, Double>();
+
+ public static Map<String,Double> readLabelSums(FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+ Map<String,Double> labelSum = new HashMap<String,Double>();
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
-
+
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
// the key is either _label_ or label,feature
while (reader.next(key, value)) {
if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) { // Sum of counts
- // of labels
+ // of labels
labelSum.put(key.stringAt(1), value.get());
}
-
+
}
}
-
+
return labelSum;
}
-
- public static Map<String, Double> readLabelDocumentCounts(FileSystem fs,
- Path pathPattern, Configuration conf) throws IOException {
- Map<String, Double> labelDocumentCounts = new HashMap<String, Double>();
+
+ public static Map<String,Double> readLabelDocumentCounts(FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+ Map<String,Double> labelDocumentCounts = new HashMap<String,Double>();
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
// the key is either _label_ or label,feature
while (reader.next(key, value)) {
- if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) { // Count of
- // Documents
- // in a Label
+ // Count of Documents in a Label
+ if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) {
labelDocumentCounts.put(key.stringAt(1), value.get());
}
-
+
}
}
-
+
return labelDocumentCounts;
}
-
- public static double readSigma_jSigma_k(FileSystem fs, Path pathPattern,
- Configuration conf) throws IOException {
- Map<String, Double> weightSum = new HashMap<String, Double>();
+
+ public static double readSigmaJSigmaK(FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+ Map<String,Double> weightSum = new HashMap<String,Double>();
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- // the key is *
while (reader.next(key, value)) {
if (weightSum.size() > 1) {
throw new IOException("Incorrect Sum File");
} else if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) {
weightSum.put(BayesConstants.TOTAL_SUM, value.get());
}
-
+
}
}
-
+
return weightSum.get(BayesConstants.TOTAL_SUM);
}
-
- public static double readVocabCount(FileSystem fs, Path pathPattern,
- Configuration conf) throws IOException {
- Map<String, Double> weightSum = new HashMap<String, Double>();
+
+ public static double readVocabCount(FileSystem fs,
+ Path pathPattern,
+ Configuration conf) throws IOException {
+ Map<String,Double> weightSum = new HashMap<String,Double>();
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
-
+
FileStatus[] outputFiles = fs.globStatus(pathPattern);
for (FileStatus fileStatus : outputFiles) {
Path path = fileStatus.getPath();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- // the key is *
while (reader.next(key, value)) {
if (weightSum.size() > 1) {
throw new IOException("Incorrect vocabCount File");
@@ -281,10 +286,10 @@
if (key.stringAt(0).equals(BayesConstants.FEATURE_SET_SIZE)) {
weightSum.put(BayesConstants.FEATURE_SET_SIZE, value.get());
}
-
+
}
}
-
+
return weightSum.get(BayesConstants.FEATURE_SET_SIZE);
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,10 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -36,16 +40,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
/** Create and run the Bayes Classifier */
-public class BayesClassifierDriver {
+public final class BayesClassifierDriver {
- private static final Logger log = LoggerFactory.getLogger(BayesClassifierDriver.class);
- private BayesClassifierDriver() {
- }
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesClassifierDriver.class);
+
+ private BayesClassifierDriver() { }
/**
* Run the job
@@ -70,9 +71,9 @@
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization," +
- "org.apache.hadoop.io.serializer.WritableSerialization");
-
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
@@ -86,12 +87,11 @@
ConfusionMatrix matrix = readResult(dfs, outputFiles, conf, params);
log.info("{}",matrix.summarize());
}
-
+
private static ConfusionMatrix readResult(FileSystem fs,
Path pathPattern,
Configuration conf,
- Parameters params)
- throws IOException {
+ Parameters params) throws IOException {
StringTuple key = new StringTuple();
DoubleWritable value = new DoubleWritable();
@@ -117,10 +117,9 @@
}
ConfusionMatrix matrix = new ConfusionMatrix(confusionMatrix.keySet(), defaultLabel);
- for(Map.Entry<String, Map<String, Integer>> correctLabelSet:confusionMatrix.entrySet())
- {
+ for (Map.Entry<String,Map<String,Integer>> correctLabelSet : confusionMatrix.entrySet()) {
Map<String, Integer> rowMatrix = correctLabelSet.getValue();
- for(Map.Entry<String, Integer> classifiedLabelSet : rowMatrix.entrySet())
+ for (Map.Entry<String, Integer> classifiedLabelSet : rowMatrix.entrySet())
{
matrix.addInstance(correctLabelSet.getKey(), classifiedLabelSet.getKey());
matrix.putCount(correctLabelSet.getKey(), classifiedLabelSet.getKey(), classifiedLabelSet.getValue());
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -24,7 +27,6 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.classifier.BayesFileFormatter;
import org.apache.mahout.classifier.ClassifierResult;
import org.apache.mahout.classifier.bayes.algorithm.BayesAlgorithm;
import org.apache.mahout.classifier.bayes.algorithm.CBayesAlgorithm;
@@ -41,46 +43,48 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-
-/** Reads the input train set(preprocessed using the {@link BayesFileFormatter}). */
+/**
+ * Reads the input train set(preprocessed using the
+ * {@link org.apache.mahout.classifier.BayesFileFormatter}).
+ */
public class BayesClassifierMapper extends MapReduceBase implements
- Mapper<Text, Text, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(BayesClassifierMapper.class);
-
+ Mapper<Text,Text,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesClassifierMapper.class);
+
private int gramSize = 1;
- private ClassifierContext classifier = null;
+ private ClassifierContext classifier;
+
+ private String defaultCategory;
- private String defaultCategory = null;
-
/**
* Parallel Classification
- *
- * @param key The label
- * @param value the features (all unique) associated w/ this label
- * @param output The OutputCollector to write the results to
- * @param reporter Reports status back to hadoop
+ *
+ * @param key
+ * The label
+ * @param value
+ * the features (all unique) associated w/ this label
+ * @param output
+ * The OutputCollector to write the results to
+ * @param reporter
+ * Reports status back to hadoop
*/
@Override
- public void map(Text key, Text value,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
- //String line = value.toString();
+ public void map(Text key,
+ Text value,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
String label = key.toString();
-
-
- //StringBuilder builder = new StringBuilder(label);
- //builder.ensureCapacity(32);// make sure we have a reasonably size buffer to
- // begin with
- List<String> ngrams = new NGrams(value.toString(), gramSize).generateNGramsWithoutLabel();
+
+ List<String> ngrams = new NGrams(value.toString(), gramSize)
+ .generateNGramsWithoutLabel();
try {
- ClassifierResult result = classifier.classifyDocument( ngrams
+ ClassifierResult result = classifier.classifyDocument(ngrams
.toArray(new String[ngrams.size()]), defaultCategory);
-
+
String correctLabel = label;
String classifiedLabel = result.getLabel();
@@ -93,16 +97,16 @@
throw new IOException(e.toString());
}
}
-
+
@Override
public void configure(JobConf job) {
try {
log.info("Bayes Parameter" + job.get("bayes.parameters"));
- Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
+ Parameters params = Parameters
+ .fromString(job.get("bayes.parameters", ""));
log.info("{}", params.print());
Algorithm algorithm;
Datastore datastore;
-
if (params.get("dataSource").equals("hdfs")) {
if (params.get("classifierType").equalsIgnoreCase("bayes")) {
@@ -115,9 +119,9 @@
datastore = new InMemoryBayesDatastore(params);
} else {
throw new IllegalArgumentException("Unrecognized classifier type: "
- + params.get("classifierType"));
+ + params.get("classifierType"));
}
-
+
} else if (params.get("dataSource").equals("hbase")) {
if (params.get("classifierType").equalsIgnoreCase("bayes")) {
log.info("Testing Bayes Classifier");
@@ -129,17 +133,16 @@
datastore = new HBaseBayesDatastore(params.get("basePath"), params);
} else {
throw new IllegalArgumentException("Unrecognized classifier type: "
- + params.get("classifierType"));
+ + params.get("classifierType"));
}
-
+
} else {
throw new IllegalArgumentException("Unrecognized dataSource type: "
- + params.get("dataSource"));
+ + params.get("dataSource"));
}
classifier = new ClassifierContext(algorithm, datastore);
classifier.initialize();
-
defaultCategory = params.get("defaultCat");
gramSize = Integer.valueOf(params.get("gramSize"));
} catch (IOException ex) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
@@ -24,9 +27,6 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.common.StringTuple;
-import java.io.IOException;
-import java.util.Iterator;
-
/** Can also be used as a local Combiner. A simple summing reducer */
public class BayesClassifierReducer extends MapReduceBase
implements Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
@@ -36,7 +36,7 @@
Iterator<DoubleWritable> values,
OutputCollector<StringTuple, DoubleWritable> output,
Reporter reporter) throws IOException {
- //Key is label,word, value is the number of times we've seen this label word per local node. Output is the same
+ // Key is label,word, value is the number of times we've seen this label word per local node. Output is the same
double sum = 0.0;
while (values.hasNext()) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java Wed Jan 6 02:46:22 2010
@@ -17,63 +17,67 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
import org.apache.mahout.classifier.bayes.common.BayesParameters;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesTfIdfDriver;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesWeightSummerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/** Create and run the Bayes Trainer. */
public class BayesDriver implements BayesJob {
-
+
private static final Logger log = LoggerFactory.getLogger(BayesDriver.class);
-
+
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
- * @throws ClassNotFoundException
- * @throws InterruptedException
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
@Override
- public void runJob(String input, String output, BayesParameters params)
- throws IOException, InterruptedException, ClassNotFoundException {
+ public void runJob(String input, String output, BayesParameters params) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
Configuration conf = new JobConf(BayesDriver.class);
Path outPath = new Path(output);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
-
+
log.info("Reading features...");
- //Read the features in each document normalized by length of each document
+ // Read the features in each document normalized by length of each document
BayesFeatureDriver feature = new BayesFeatureDriver();
feature.runJob(input, output, params);
-
+
log.info("Calculating Tf-Idf...");
- //Calculate the TfIdf for each word in each label
+ // Calculate the TfIdf for each word in each label
BayesTfIdfDriver tfidf = new BayesTfIdfDriver();
tfidf.runJob(input, output, params);
-
+
log.info("Calculating weight sums for labels and features...");
- //Calculate the Sums of weights for each label, for each feature and for each feature and for each label
+ // Calculate the Sums of weights for each label, for each feature and for
+ // each feature and for each label
BayesWeightSummerDriver summer = new BayesWeightSummerDriver();
summer.runJob(input, output, params);
-
+
log.info("Calculating the weight Normalisation factor for each class...");
- //Calculate the normalization factor Sigma_W_ij for each complement class.
+ // Calculate the normalization factor Sigma_W_ij for each complement class.
BayesThetaNormalizerDriver normalizer = new BayesThetaNormalizerDriver();
normalizer.runJob(input, output, params);
-
+
Path docCountOutPath = new Path(output + "/trainer-docCount");
if (dfs.exists(docCountOutPath)) {
dfs.delete(docCountOutPath, true);
@@ -98,6 +102,6 @@
if (dfs.exists(vocabCountOutPath)) {
dfs.delete(vocabCountOutPath, true);
}
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,74 +39,83 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/** Create and run the Bayes Theta Normalization Step. */
public class BayesThetaNormalizerDriver implements BayesJob {
-
- private static final Logger log = LoggerFactory.getLogger(BayesThetaNormalizerDriver.class);
-
+
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesThetaNormalizerDriver.class);
+
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
*/
@Override
public void runJob(String input, String output, BayesParameters params) throws IOException {
Configurable client = new JobClient();
JobConf conf = new JobConf(BayesThetaNormalizerDriver.class);
-
- conf.setJobName("Bayes Theta Normalizer Driver running over input: " + input);
-
+
+ conf.setJobName("Bayes Theta Normalizer Driver running over input: "
+ + input);
+
conf.setOutputKeyClass(StringTuple.class);
conf.setOutputValueClass(DoubleWritable.class);
- FileInputFormat.addInputPath(conf, new Path(output + "/trainer-tfIdf/trainer-tfIdf"));
+ FileInputFormat.addInputPath(conf, new Path(
+ output + "/trainer-tfIdf/trainer-tfIdf"));
Path outPath = new Path(output + "/trainer-thetaNormalizer");
FileOutputFormat.setOutputPath(conf, outPath);
- //conf.setNumMapTasks(100);
- //conf.setNumReduceTasks(1);
+ // conf.setNumMapTasks(100);
+ // conf.setNumReduceTasks(1);
conf.setMapperClass(BayesThetaNormalizerMapper.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setCombinerClass(BayesThetaNormalizerReducer.class);
conf.setReducerClass(BayesThetaNormalizerReducer.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
- // Dont ever forget this. People should keep track of how hadoop conf parameters and make or break a piece of code
-
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ // Dont ever forget this. People should keep track of how hadoop conf
+ // parameters and make or break a piece of code
+
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
-
- Path Sigma_kFiles = new Path(output + "/trainer-weights/Sigma_k/*");
- Map<String, Double> labelWeightSum = SequenceFileModelReader.readLabelSums(dfs, Sigma_kFiles, conf);
- DefaultStringifier<Map<String, Double>> mapStringifier =
- new DefaultStringifier<Map<String, Double>>(conf, GenericsUtil.getClass(labelWeightSum));
+
+ Path sigmaKFiles = new Path(output + "/trainer-weights/Sigma_k/*");
+ Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(
+ dfs, sigmaKFiles, conf);
+ DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(
+ conf, GenericsUtil.getClass(labelWeightSum));
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
-
+
log.info("Sigma_k for Each Label");
- Map<String, Double> c = mapStringifier.fromString(labelWeightSumString);
+ Map<String,Double> c = mapStringifier.fromString(labelWeightSumString);
log.info("{}", c);
conf.set("cnaivebayes.sigma_k", labelWeightSumString);
-
-
- Path sigma_kSigma_jFile = new Path(output + "/trainer-weights/Sigma_kSigma_j/*");
- double sigma_jSigma_k = SequenceFileModelReader.readSigma_jSigma_k(dfs, sigma_kSigma_jFile, conf);
- DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(conf, Double.class);
- String sigma_jSigma_kString = stringifier.toString(sigma_jSigma_k);
-
+
+ Path sigmaJSigmaKFile = new Path(output
+ + "/trainer-weights/Sigma_kSigma_j/*");
+ double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(dfs,
+ sigmaJSigmaKFile, conf);
+ DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(
+ conf, Double.class);
+ String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
+
log.info("Sigma_kSigma_j for each Label and for each Features");
- double retSigma_jSigma_k = stringifier.fromString(sigma_jSigma_kString);
- log.info("{}", retSigma_jSigma_k);
- conf.set("cnaivebayes.sigma_jSigma_k", sigma_jSigma_kString);
-
- Path vocabCountFile = new Path(output + "/trainer-tfIdf/trainer-vocabCount/*");
- double vocabCount = SequenceFileModelReader.readVocabCount(dfs, vocabCountFile, conf);
+ double retSigmaJSigmaK = stringifier.fromString(sigmaJSigmaKString);
+ log.info("{}", retSigmaJSigmaK);
+ conf.set("cnaivebayes.sigma_jSigma_k", sigmaJSigmaKString);
+
+ Path vocabCountFile = new Path(output
+ + "/trainer-tfIdf/trainer-vocabCount/*");
+ double vocabCount = SequenceFileModelReader.readVocabCount(dfs,
+ vocabCountFile, conf);
String vocabCountString = stringifier.toString(vocabCount);
-
+
log.info("Vocabulary Count");
conf.set("cnaivebayes.vocabCount", vocabCountString);
double retvocabCount = stringifier.fromString(vocabCountString);
@@ -111,8 +123,8 @@
conf.set("bayes.parameters", params.toString());
conf.set("output.table", output);
client.setConf(conf);
-
+
JobClient.runJob(conf);
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,10 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -31,73 +35,80 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
+/**
+ * Mapper for Calculating the ThetaNormalizer for a label in Naive Bayes Algorithm
+ *
+ */
public class BayesThetaNormalizerMapper extends MapReduceBase implements
- Mapper<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(BayesThetaNormalizerMapper.class);
-
- private Map<String, Double> labelWeightSum = null;
- private double sigma_jSigma_k = 0.0;
- private double vocabCount = 0.0;
- private double alpha_i = 1.0;
-
+ Mapper<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesThetaNormalizerMapper.class);
+
+ private Map<String,Double> labelWeightSum;
+ private double sigmaJSigmaK;
+ private double vocabCount;
+ private double alphaI = 1.0;
+
/**
* We need to calculate the thetaNormalization factor of each label
- *
- * @param key The label,feature pair
- * @param value The tfIdf of the pair
+ *
+ * @param key
+ * The label,feature pair
+ * @param value
+ * The tfIdf of the pair
*/
@Override
- public void map(StringTuple key, DoubleWritable value,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
-
+ public void map(StringTuple key,
+ DoubleWritable value,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
+
String label = key.stringAt(1);
-
+
reporter.setStatus("Bayes Theta Normalizer Mapper: " + label);
- double weight = Math.log((value.get() + alpha_i) / (labelWeightSum.get(label) + vocabCount));
- StringTuple thetaNormalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
+ double weight = Math.log((value.get() + alphaI)
+ / (labelWeightSum.get(label) + vocabCount));
+ StringTuple thetaNormalizerTuple = new StringTuple(
+ BayesConstants.LABEL_THETA_NORMALIZER);
thetaNormalizerTuple.add(label);
output.collect(thetaNormalizerTuple, new DoubleWritable(weight));
}
-
+
@Override
public void configure(JobConf job) {
try {
if (labelWeightSum == null) {
- labelWeightSum = new HashMap<String, Double>();
-
- DefaultStringifier<Map<String, Double>> mapStringifier = new DefaultStringifier<Map<String, Double>>(
+ labelWeightSum = new HashMap<String,Double>();
+
+ DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(
job, GenericsUtil.getClass(labelWeightSum));
-
+
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
labelWeightSumString = job.get("cnaivebayes.sigma_k",
- labelWeightSumString);
+ labelWeightSumString);
labelWeightSum = mapStringifier.fromString(labelWeightSumString);
-
+
DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(
- job, GenericsUtil.getClass(sigma_jSigma_k));
- String sigma_jSigma_kString = stringifier.toString(sigma_jSigma_k);
- sigma_jSigma_kString = job.get("cnaivebayes.sigma_jSigma_k",
- sigma_jSigma_kString);
- sigma_jSigma_k = stringifier.fromString(sigma_jSigma_kString);
-
+ job, GenericsUtil.getClass(sigmaJSigmaK));
+ String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
+ sigmaJSigmaKString = job.get("cnaivebayes.sigma_jSigma_k",
+ sigmaJSigmaKString);
+ sigmaJSigmaK = stringifier.fromString(sigmaJSigmaKString);
+
String vocabCountString = stringifier.toString(vocabCount);
vocabCountString = job.get("cnaivebayes.vocabCount", vocabCountString);
vocabCount = stringifier.fromString(vocabCountString);
- Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
- alpha_i = Double.valueOf(params.get("alpha_i", "1.0"));
-
+ Parameters params = Parameters.fromString(job.get("bayes.parameters",
+ ""));
+ alphaI = Double.valueOf(params.get("alpha_i", "1.0"));
+
}
} catch (IOException ex) {
log.warn(ex.toString(), ex);
}
}
-
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.bayes;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -33,69 +36,66 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Iterator;
-
/**
* Can also be used as a local Combiner beacuse only two values should be there
* inside the values
*/
public class BayesThetaNormalizerReducer extends MapReduceBase implements
- Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(BayesThetaNormalizerReducer.class);
-
+ Reducer<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesThetaNormalizerReducer.class);
+
private HTable table;
-
- private boolean useHbase = false;
-
+
+ private boolean useHbase;
+
@Override
- public void reduce(StringTuple key, Iterator<DoubleWritable> values,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
+ public void reduce(StringTuple key,
+ Iterator<DoubleWritable> values,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
// Key is label,word, value is the number of times we've seen this label
// word per local node. Output is the same
-
+
// String token = key.toString();
-
+
double weightSumPerLabel = 0.0;
-
+
while (values.hasNext()) {
reporter.setStatus("Bayes Theta Normalizer Reducer: " + key);
weightSumPerLabel += values.next().get();
}
reporter.setStatus("Bayes Theta Normalizer Reducer: " + key + " => "
- + weightSumPerLabel);
- if (useHbase) {
+ + weightSumPerLabel);
+ if (useHbase) {
if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
- String label = key.stringAt(1);
- Put bu = new Put(Bytes.toBytes(BayesConstants.LABEL_THETA_NORMALIZER));
- bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes.toBytes(label), Bytes
- .toBytes(weightSumPerLabel));
- table.put(bu);
- }
- }
+ String label = key.stringAt(1);
+ Put bu = new Put(Bytes.toBytes(BayesConstants.LABEL_THETA_NORMALIZER));
+ bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes
+ .toBytes(label), Bytes.toBytes(weightSumPerLabel));
+ table.put(bu);
+ }
+ }
output.collect(key, new DoubleWritable(weightSumPerLabel));
-
+
}
-
+
@Override
public void configure(JobConf job) {
try {
Parameters params = Parameters
.fromString(job.get("bayes.parameters", ""));
- if (params.get("dataSource").equals("hbase"))
- useHbase = true;
- else
- return;
-
- HBaseConfiguration HBconf = new HBaseConfiguration(job);
- table = new HTable(HBconf, job.get("output.table"));
+ if (params.get("dataSource").equals("hbase")) useHbase = true;
+ else return;
+
+ HBaseConfiguration hBconf = new HBaseConfiguration(job);
+ table = new HTable(hBconf, job.get("output.table"));
} catch (IOException e) {
log.error("Unexpected error during configuration", e);
}
}
-
+
@Override
public void close() throws IOException {
if (useHbase) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java Wed Jan 6 02:46:22 2010
@@ -17,62 +17,68 @@
package org.apache.mahout.classifier.bayes.mapreduce.cbayes;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
import org.apache.mahout.classifier.bayes.common.BayesParameters;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureDriver;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesTfIdfDriver;
import org.apache.mahout.classifier.bayes.mapreduce.common.BayesWeightSummerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/** Create and run the Bayes Trainer. */
-public class CBayesDriver implements BayesJob{
-
+public class CBayesDriver implements BayesJob {
+
private static final Logger log = LoggerFactory.getLogger(CBayesDriver.class);
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
- * @throws ClassNotFoundException
- * @throws InterruptedException
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
@Override
- public void runJob(String input, String output, BayesParameters params) throws IOException, InterruptedException, ClassNotFoundException {
+ public void runJob(String input, String output, BayesParameters params) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
Configuration conf = new JobConf(CBayesDriver.class);
Path outPath = new Path(output);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
-
+
log.info("Reading features...");
- //Read the features in each document normalized by length of each document
+ // Read the features in each document normalized by length of each document
BayesFeatureDriver feature = new BayesFeatureDriver();
feature.runJob(input, output, params);
-
+
log.info("Calculating Tf-Idf...");
- //Calculate the TfIdf for each word in each label
+ // Calculate the TfIdf for each word in each label
BayesTfIdfDriver tfidf = new BayesTfIdfDriver();
- tfidf.runJob(input, output,params);
-
+ tfidf.runJob(input, output, params);
+
log.info("Calculating weight sums for labels and features...");
- //Calculate the Sums of weights for each label, for each feature and for each feature and for each label
+ // Calculate the Sums of weights for each label, for each feature and for
+ // each feature and for each label
BayesWeightSummerDriver summer = new BayesWeightSummerDriver();
summer.runJob(input, output, params);
-
- log.info("Calculating the weight Normalisation factor for each complement class...");
- //Calculate the normalization factor Sigma_W_ij for each complement class.
+
+ log
+ .info("Calculating the weight Normalisation factor for each complement class...");
+ // Calculate the normalization factor Sigma_W_ij for each complement class.
CBayesThetaNormalizerDriver normalizer = new CBayesThetaNormalizerDriver();
normalizer.runJob(input, output, params);
-
+
Path docCountOutPath = new Path(output + "/trainer-docCount");
if (dfs.exists(docCountOutPath)) {
dfs.delete(docCountOutPath, true);
@@ -97,7 +103,6 @@
if (dfs.exists(vocabCountOutPath)) {
dfs.delete(vocabCountOutPath, true);
}
-
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.cbayes;
+import java.io.IOException;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,73 +39,84 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/** Create and run the Bayes Trainer. */
public class CBayesThetaNormalizerDriver implements BayesJob {
-
- private static final Logger log = LoggerFactory.getLogger(CBayesThetaNormalizerDriver.class);
-
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CBayesThetaNormalizerDriver.class);
+
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
*/
@Override
public void runJob(String input, String output, BayesParameters params) throws IOException {
Configurable client = new JobClient();
JobConf conf = new JobConf(CBayesThetaNormalizerDriver.class);
- conf.setJobName("Complementary Bayes Theta Normalizer Driver running over input: " + input);
-
-
+ conf
+ .setJobName("Complementary Bayes Theta Normalizer Driver running over input: "
+ + input);
+
conf.setOutputKeyClass(StringTuple.class);
conf.setOutputValueClass(DoubleWritable.class);
- FileInputFormat.addInputPath(conf, new Path(output + "/trainer-weights/Sigma_j"));
- FileInputFormat.addInputPath(conf, new Path(output + "/trainer-tfIdf/trainer-tfIdf"));
+ FileInputFormat.addInputPath(conf, new Path(output
+ + "/trainer-weights/Sigma_j"));
+ FileInputFormat.addInputPath(conf, new Path(
+ output + "/trainer-tfIdf/trainer-tfIdf"));
Path outPath = new Path(output + "/trainer-thetaNormalizer");
FileOutputFormat.setOutputPath(conf, outPath);
- //conf.setNumMapTasks(100);
- //conf.setNumReduceTasks(1);
+ // conf.setNumMapTasks(100);
+ // conf.setNumReduceTasks(1);
conf.setMapperClass(CBayesThetaNormalizerMapper.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setCombinerClass(CBayesThetaNormalizerReducer.class);
conf.setReducerClass(CBayesThetaNormalizerReducer.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
- // Dont ever forget this. People should keep track of how hadoop conf parameters and make or break a piece of code
-
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ // Dont ever forget this. People should keep track of how hadoop conf
+ // parameters and make or break a piece of code
+
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
-
- Path Sigma_kFiles = new Path(output + "/trainer-weights/Sigma_k/*");
- Map<String, Double> labelWeightSum = SequenceFileModelReader.readLabelSums(dfs, Sigma_kFiles, conf);
- DefaultStringifier<Map<String, Double>> mapStringifier =
- new DefaultStringifier<Map<String, Double>>(conf, GenericsUtil.getClass(labelWeightSum));
+
+ Path sigmaKFiles = new Path(output + "/trainer-weights/Sigma_k/*");
+ Map<String,Double> labelWeightSum = SequenceFileModelReader.readLabelSums(
+ dfs, sigmaKFiles, conf);
+ DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(
+ conf, GenericsUtil.getClass(labelWeightSum));
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
-
+
log.info("Sigma_k for Each Label");
- Map<String, Double> c = mapStringifier.fromString(labelWeightSumString);
+ Map<String,Double> c = mapStringifier.fromString(labelWeightSumString);
log.info("{}", c);
conf.set("cnaivebayes.sigma_k", labelWeightSumString);
-
-
- Path sigma_kSigma_jFile = new Path(output + "/trainer-weights/Sigma_kSigma_j/*");
- double sigma_jSigma_k = SequenceFileModelReader.readSigma_jSigma_k(dfs, sigma_kSigma_jFile, conf);
- DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(conf, Double.class);
- String sigma_jSigma_kString = stringifier.toString(sigma_jSigma_k);
-
+
+ Path sigmaKSigmaJFile = new Path(output
+ + "/trainer-weights/Sigma_kSigma_j/*");
+ double sigmaJSigmaK = SequenceFileModelReader.readSigmaJSigmaK(dfs,
+ sigmaKSigmaJFile, conf);
+ DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(
+ conf, Double.class);
+ String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
+
log.info("Sigma_kSigma_j for each Label and for each Features");
- double retSigma_jSigma_k = stringifier.fromString(sigma_jSigma_kString);
- log.info("{}", retSigma_jSigma_k);
- conf.set("cnaivebayes.sigma_jSigma_k", sigma_jSigma_kString);
-
- Path vocabCountFile = new Path(output + "/trainer-tfIdf/trainer-vocabCount/*");
- double vocabCount = SequenceFileModelReader.readVocabCount(dfs, vocabCountFile, conf);
+ double retSigmaJSigmaK = stringifier.fromString(sigmaJSigmaKString);
+ log.info("{}", retSigmaJSigmaK);
+ conf.set("cnaivebayes.sigma_jSigma_k", sigmaJSigmaKString);
+
+ Path vocabCountFile = new Path(output
+ + "/trainer-tfIdf/trainer-vocabCount/*");
+ double vocabCount = SequenceFileModelReader.readVocabCount(dfs,
+ vocabCountFile, conf);
String vocabCountString = stringifier.toString(vocabCount);
log.info("Vocabulary Count");
@@ -112,8 +126,8 @@
conf.set("bayes.parameters", params.toString());
conf.set("output.table", output);
client.setConf(conf);
-
+
JobClient.runJob(conf);
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,10 @@
package org.apache.mahout.classifier.bayes.mapreduce.cbayes;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -31,94 +35,111 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
+/**
+ * Mapper for Calculating the ThetaNormalizer for a label in Naive Bayes
+ * Algorithm
+ *
+ */
public class CBayesThetaNormalizerMapper extends MapReduceBase implements
- Mapper<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(CBayesThetaNormalizerMapper.class);
-
- private Map<String, Double> labelWeightSum = null;
- private double sigma_jSigma_k = 0.0;
- private double vocabCount = 0.0;
- private double alpha_i = 1.0;
+ Mapper<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CBayesThetaNormalizerMapper.class);
+
+ private Map<String,Double> labelWeightSum;
+ private double sigmaJSigmaK;
+ private double vocabCount;
+ private double alphaI = 1.0;
+
/**
* We need to calculate the idf of each feature in each label
- *
- * @param key The label,feature pair (can either be the freq Count or the term Document count
+ *
+ * @param key
+ * The label,feature pair (can either be the freq Count or the term
+ * Document count
*/
@Override
- public void map(StringTuple key, DoubleWritable value,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
-
- if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) { // if it is from the Sigma_j folder
-
+ public void map(StringTuple key,
+ DoubleWritable value,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
+
+ if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) { // if it is from
+ // the Sigma_j
+ // folder
- for (Map.Entry<String, Double> stringDoubleEntry : labelWeightSum.entrySet()) {
+ for (Map.Entry<String,Double> stringDoubleEntry : labelWeightSum
+ .entrySet()) {
String label = stringDoubleEntry.getKey();
- double weight = Math.log((value.get() + alpha_i) / (sigma_jSigma_k - stringDoubleEntry.getValue() + vocabCount));
+ double weight = Math
+ .log((value.get() + alphaI)
+ / (sigmaJSigmaK - stringDoubleEntry.getValue() + vocabCount));
+
+ reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: "
+ + stringDoubleEntry + " => " + weight);
+ StringTuple normalizerTuple = new StringTuple(
+ BayesConstants.LABEL_THETA_NORMALIZER);
+ normalizerTuple.add(label);
+ output.collect(normalizerTuple, new DoubleWritable(weight)); // output
+ // Sigma_j
- reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: " + stringDoubleEntry + " => " + weight);
- StringTuple normalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
- normalizerTuple.add(label);
- output.collect(normalizerTuple, new DoubleWritable(weight)); //output Sigma_j
-
}
-
+
} else {
String label = key.stringAt(1);
-
- double D_ij = value.get();
- double denominator = 0.5 * ((sigma_jSigma_k / vocabCount) + (D_ij * this.labelWeightSum.size()));
- double weight = Math.log(1.0 - D_ij / denominator);
-
- reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: " + label + " => " + weight);
-
- StringTuple normalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
- normalizerTuple.add(label);
- output.collect(normalizerTuple, new DoubleWritable(weight));//output -D_ij
-
-
+ double dIJ = value.get();
+ double denominator = 0.5 * ((sigmaJSigmaK / vocabCount) + (dIJ * this.labelWeightSum
+ .size()));
+ double weight = Math.log(1.0 - dIJ / denominator);
+
+ reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: "
+ + label + " => " + weight);
+
+ StringTuple normalizerTuple = new StringTuple(
+ BayesConstants.LABEL_THETA_NORMALIZER);
+ normalizerTuple.add(label);
+
+ // output -D_ij
+ output.collect(normalizerTuple, new DoubleWritable(weight));
+
}
-
+
}
-
+
@Override
public void configure(JobConf job) {
try {
if (labelWeightSum == null) {
- labelWeightSum = new HashMap<String, Double>();
-
- DefaultStringifier<Map<String, Double>> mapStringifier = new DefaultStringifier<Map<String, Double>>(
+ labelWeightSum = new HashMap<String,Double>();
+
+ DefaultStringifier<Map<String,Double>> mapStringifier = new DefaultStringifier<Map<String,Double>>(
job, GenericsUtil.getClass(labelWeightSum));
-
+
String labelWeightSumString = mapStringifier.toString(labelWeightSum);
labelWeightSumString = job.get("cnaivebayes.sigma_k",
- labelWeightSumString);
+ labelWeightSumString);
labelWeightSum = mapStringifier.fromString(labelWeightSumString);
-
+
DefaultStringifier<Double> stringifier = new DefaultStringifier<Double>(
- job, GenericsUtil.getClass(sigma_jSigma_k));
- String sigma_jSigma_kString = stringifier.toString(sigma_jSigma_k);
- sigma_jSigma_kString = job.get("cnaivebayes.sigma_jSigma_k",
- sigma_jSigma_kString);
- sigma_jSigma_k = stringifier.fromString(sigma_jSigma_kString);
-
+ job, GenericsUtil.getClass(sigmaJSigmaK));
+ String sigmaJSigmaKString = stringifier.toString(sigmaJSigmaK);
+ sigmaJSigmaKString = job.get("cnaivebayes.sigma_jSigma_k",
+ sigmaJSigmaKString);
+ sigmaJSigmaK = stringifier.fromString(sigmaJSigmaKString);
+
String vocabCountString = stringifier.toString(vocabCount);
vocabCountString = job.get("cnaivebayes.vocabCount", vocabCountString);
vocabCount = stringifier.fromString(vocabCountString);
- Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
- alpha_i = Double.valueOf(params.get("alpha_i", "1.0"));
-
+ Parameters params = Parameters.fromString(job.get("bayes.parameters",
+ ""));
+ alphaI = Double.valueOf(params.get("alpha_i", "1.0"));
+
}
} catch (IOException ex) {
log.warn(ex.toString(), ex);
}
}
-
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.cbayes;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -33,70 +36,73 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Iterator;
-
-/** Can also be used as a local Combiner beacuse only two values should be there inside the values */
+/**
+ * Can also be used as a local Combiner beacuse only two values should be there
+ * inside the values
+ */
public class CBayesThetaNormalizerReducer extends MapReduceBase implements
- Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(CBayesThetaNormalizerReducer.class);
-
+ Reducer<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CBayesThetaNormalizerReducer.class);
+
private HTable table;
-
- private ThreadLocal<HBaseConfiguration> HBconf;
-
- private boolean useHbase = false;
-
+
+ private ThreadLocal<HBaseConfiguration> hBconf;
+
+ private boolean useHbase;
+
@Override
- public void reduce(StringTuple key, Iterator<DoubleWritable> values,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
+ public void reduce(StringTuple key,
+ Iterator<DoubleWritable> values,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
// Key is label,word, value is the number of times we've seen this label
// word per local node. Output is the same
-
+
double weightSumPerLabel = 0.0;
-
+
while (values.hasNext()) {
- reporter.setStatus("Complementary Bayes Theta Normalizer Reducer: " + key);
+ reporter
+ .setStatus("Complementary Bayes Theta Normalizer Reducer: " + key);
weightSumPerLabel += values.next().get();
}
- reporter.setStatus("Complementary Bayes Theta Normalizer Reducer: " + key + " => " + weightSumPerLabel);
-
- if (useHbase) {
- if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
+ reporter.setStatus("Complementary Bayes Theta Normalizer Reducer: " + key
+ + " => " + weightSumPerLabel);
+
+ if (useHbase) {
+ if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
String label = key.stringAt(1);
Put bu = new Put(Bytes.toBytes(BayesConstants.LABEL_THETA_NORMALIZER));
- bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes.toBytes(label), Bytes
- .toBytes(weightSumPerLabel));
+ bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes
+ .toBytes(label), Bytes.toBytes(weightSumPerLabel));
table.put(bu);
}
}
output.collect(key, new DoubleWritable(weightSumPerLabel));
-
+
}
+
@Override
public void configure(JobConf job) {
try {
- Parameters params = Parameters.fromString(job.get(
- "bayes.parameters", ""));
- if (params.get("dataSource").equals("hbase"))
- useHbase = true;
- else
- return;
-
- HBconf.set(new HBaseConfiguration(job));
- table = new HTable(HBconf.get(), job.get("output.table"));
+ Parameters params = Parameters
+ .fromString(job.get("bayes.parameters", ""));
+ if (params.get("dataSource").equals("hbase")) useHbase = true;
+ else return;
+
+ hBconf.set(new HBaseConfiguration(job));
+ table = new HTable(hBconf.get(), job.get("output.table"));
} catch (IOException e) {
log.error("Unexpected error during configuration", e);
}
}
-
+
@Override
public void close() throws IOException {
if (useHbase) {
- table.close();
- }
+ table.close();
+ }
super.close();
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java Wed Jan 6 02:46:22 2010
@@ -17,33 +17,38 @@
package org.apache.mahout.classifier.bayes.mapreduce.common;
+/**
+ * Class containing Constants used by Naive Bayes classifier classes
+ *
+ */
public final class BayesConstants {
-
- private BayesConstants() {
- }
-
+
+ // Ensure all the strings are unique
+ public static final String ALPHA_SMOOTHING_FACTOR = "__SF"; // -
+
public static final String DOCUMENT_FREQUENCY = "__DF"; // -
-
+
public static final String LABEL_COUNT = "__LC"; // _
-
+
public static final String FEATURE_COUNT = "__FC"; // ,
-
+
public static final String WEIGHT = "__WT";
-
+
public static final String FEATURE_SET_SIZE = "__FS";
-
+
public static final String FEATURE_SUM = "__SJ";
-
+
public static final String LABEL_SUM = "__SK";
-
+
public static final String TOTAL_SUM = "_SJSK";
-
+
public static final String CLASSIFIER_TUPLE = "__CT";
-
+
public static final String LABEL_THETA_NORMALIZER = "_LTN";
-
+
public static final String HBASE_COUNTS_ROW = "_HBASE_COUNTS_ROW";
-
+
public static final String HBASE_COLUMN_FAMILY = "LABEL";
-
+
+ private BayesConstants() { }
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,8 @@
package org.apache.mahout.classifier.bayes.mapreduce.common;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,47 +31,49 @@
import org.apache.mahout.classifier.bayes.common.BayesParameters;
import org.apache.mahout.common.StringTuple;
-import java.io.IOException;
-
/** Create and run the Bayes Feature Reader Step. */
public class BayesFeatureDriver implements BayesJob {
-
+
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
*/
@Override
public void runJob(String input, String output, BayesParameters params) throws IOException {
Configurable client = new JobClient();
JobConf conf = new JobConf(BayesFeatureDriver.class);
- conf.setJobName("Bayes Feature Driver running over input: " + input);
+ conf.setJobName("Bayes Feature Driver running over input: " + input);
conf.setOutputKeyClass(StringTuple.class);
conf.setOutputValueClass(DoubleWritable.class);
-
+
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(conf, outPath);
conf.setMapperClass(BayesFeatureMapper.class);
-
+
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setCombinerClass(BayesFeatureReducer.class);
conf.setReducerClass(BayesFeatureReducer.class);
conf.setOutputFormat(BayesFeatureOutputFormat.class);
- conf.set("io.serializations",
- "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ conf
+ .set(
+ "io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
// this conf parameter needs to be set enable serialisation of conf values
-
+
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
conf.set("bayes.parameters", params.toString());
-
+
client.setConf(conf);
JobClient.runJob(conf);
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,11 @@
package org.apache.mahout.classifier.bayes.mapreduce.common;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -24,50 +29,56 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.classifier.BayesFileFormatter;
import org.apache.mahout.common.Parameters;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.common.nlp.NGrams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** Reads the input train set(preprocessed using the {@link BayesFileFormatter}). */
+/**
+ * Reads the input train set(preprocessed using the {@link org.apache.mahout.classifier.BayesFileFormatter}).
+ */
public class BayesFeatureMapper extends MapReduceBase implements
- Mapper<Text, Text, StringTuple, DoubleWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(BayesFeatureMapper.class);
-
- private static final DoubleWritable one = new DoubleWritable(1.0);
-
+ Mapper<Text,Text,StringTuple,DoubleWritable> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(BayesFeatureMapper.class);
+
+ private static final DoubleWritable ONE = new DoubleWritable(1.0);
+
private int gramSize = 1;
-
+
/**
- * We need to count the number of times we've seen a term with a given label and we need to output that. But this
- * Mapper does more than just outputing the count. It first does weight normalisation. Secondly, it outputs for each
- * unique word in a document value 1 for summing up as the Term Document Frequency. Which later is used to calculate
- * the Idf Thirdly, it outputs for each label the number of times a document was seen(Also used in Idf Calculation)
- *
- * @param key The label
- * @param value the features (all unique) associated w/ this label
- * @param output The OutputCollector to write the results to
- * @param reporter Not used
+ * We need to count the number of times we've seen a term with a given label
+ * and we need to output that. But this Mapper does more than just outputing
+ * the count. It first does weight normalisation. Secondly, it outputs for
+ * each unique word in a document value 1 for summing up as the Term Document
+ * Frequency. Which later is used to calculate the Idf Thirdly, it outputs for
+ * each label the number of times a document was seen(Also used in Idf
+ * Calculation)
+ *
+ * @param key
+ * The label
+ * @param value
+ * the features (all unique) associated w/ this label
+ * @param output
+ * The OutputCollector to write the results to
+ * @param reporter
+ * Not used
*/
@Override
- public void map(Text key, Text value,
- OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
- throws IOException {
- //String line = value.toString();
+ public void map(Text key,
+ Text value,
+ OutputCollector<StringTuple,DoubleWritable> output,
+ Reporter reporter) throws IOException {
+ // String line = value.toString();
String label = key.toString();
-
- Map<String, int[]> wordList = new HashMap<String, int[]>(1000);
-
- List<String> ngrams = new NGrams(value.toString(), gramSize).generateNGramsWithoutLabel();
-
+
+ Map<String,int[]> wordList = new HashMap<String,int[]>(1000);
+
+ List<String> ngrams = new NGrams(value.toString(), gramSize)
+ .generateNGramsWithoutLabel();
+
for (String ngram : ngrams) {
int[] count = wordList.get(ngram);
if (count == null) {
@@ -78,26 +89,27 @@
count[0]++;
}
double lengthNormalisation = 0.0;
- for (int[] D_kj : wordList.values()) {
+ for (int[] dKJ : wordList.values()) {
// key is label,word
- double dkjValue = (double) D_kj[0];
+ double dkjValue = (double) dKJ[0];
lengthNormalisation += dkjValue * dkjValue;
}
lengthNormalisation = Math.sqrt(lengthNormalisation);
-
+
// Output Length Normalized + TF Transformed Frequency per Word per Class
// Log(1 + D_ij)/SQRT( SIGMA(k, D_kj) )
- for (Map.Entry<String, int[]> entry : wordList.entrySet()) {
+ for (Map.Entry<String,int[]> entry : wordList.entrySet()) {
// key is label,word
String token = entry.getKey();
StringTuple tuple = new StringTuple();
tuple.add(BayesConstants.WEIGHT);
tuple.add(label);
tuple.add(token);
- DoubleWritable f = new DoubleWritable(Math.log(1.0 + entry.getValue()[0]) / lengthNormalisation);
+ DoubleWritable f = new DoubleWritable(Math.log(1.0 + entry.getValue()[0])
+ / lengthNormalisation);
output.collect(tuple, f);
}
- reporter.setStatus("Bayes Feature Mapper: Document Label: " + label);
+ reporter.setStatus("Bayes Feature Mapper: Document Label: " + label);
// Output Document Frequency per Word per Class
@@ -107,31 +119,32 @@
StringTuple dfTuple = new StringTuple();
dfTuple.add(BayesConstants.DOCUMENT_FREQUENCY);
dfTuple.add(label);
- dfTuple.add(token);
- output.collect(dfTuple, one);
+ dfTuple.add(token);
+ output.collect(dfTuple, ONE);
StringTuple tokenCountTuple = new StringTuple();
tokenCountTuple.add(BayesConstants.FEATURE_COUNT);
tokenCountTuple.add(token);
- output.collect(tokenCountTuple, one);
-
+ output.collect(tokenCountTuple, ONE);
+
}
-
+
// output that we have seen the label to calculate the Count of Document per
// class
StringTuple labelCountTuple = new StringTuple();
labelCountTuple.add(BayesConstants.LABEL_COUNT);
labelCountTuple.add(label);
- output.collect(labelCountTuple, one);
+ output.collect(labelCountTuple, ONE);
}
-
+
@Override
public void configure(JobConf job) {
try {
log.info("Bayes Parameter {}", job.get("bayes.parameters"));
- Parameters params = Parameters.fromString(job.get("bayes.parameters",""));
+ Parameters params = Parameters
+ .fromString(job.get("bayes.parameters", ""));
gramSize = Integer.valueOf(params.get("gramSize"));
-
+
} catch (IOException ex) {
log.warn(ex.toString(), ex);
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,8 @@
package org.apache.mahout.classifier.bayes.mapreduce.common;
+import java.io.IOException;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -27,44 +29,45 @@
import org.apache.hadoop.util.Progressable;
import org.apache.mahout.common.StringTuple;
-import java.io.IOException;
-
/**
- * This class extends the MultipleOutputFormat, allowing to write the output data to different output files in sequence
- * file output format.
+ * This class extends the MultipleOutputFormat, allowing to write the output
+ * data to different output files in sequence file output format.
*/
-public class BayesFeatureOutputFormat extends MultipleOutputFormat<WritableComparable<?>, Writable> {
-
- private SequenceFileOutputFormat<WritableComparable<?>, Writable> theSequenceFileOutputFormat = null;
-
+public class BayesFeatureOutputFormat extends
+ MultipleOutputFormat<WritableComparable<?>,Writable> {
+
+ private SequenceFileOutputFormat<WritableComparable<?>,Writable> theSequenceFileOutputFormat;
+
@Override
- protected RecordWriter<WritableComparable<?>, Writable> getBaseRecordWriter(
- FileSystem fs, JobConf job, String name, Progressable arg3)
- throws IOException {
+ protected RecordWriter<WritableComparable<?>,Writable> getBaseRecordWriter(FileSystem fs,
+ JobConf job,
+ String name,
+ Progressable arg3) throws IOException {
if (theSequenceFileOutputFormat == null) {
- theSequenceFileOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Writable>();
+ theSequenceFileOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Writable>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
-
+
@Override
- protected String generateFileNameForKeyValue(WritableComparable<?> k, Writable v, String name) {
+ protected String generateFileNameForKeyValue(WritableComparable<?> k,
+ Writable v,
+ String name) {
StringTuple key = (StringTuple) k;
- if(key.length() == 3)
- {
- if(key.stringAt(0).equals(BayesConstants.WEIGHT))
+ if (key.length() == 3) {
+ if (key.stringAt(0).equals(BayesConstants.WEIGHT)) {
return "trainer-wordFreq/" + name;
- else if(key.stringAt(0).equals(BayesConstants.DOCUMENT_FREQUENCY))
+ } else if (key.stringAt(0).equals(BayesConstants.DOCUMENT_FREQUENCY)) {
return "trainer-termDocCount/" + name;
- }
- else if(key.length() == 2)
- {
- if(key.stringAt(0).equals(BayesConstants.FEATURE_COUNT))
+ }
+ } else if (key.length() == 2) {
+ if (key.stringAt(0).equals(BayesConstants.FEATURE_COUNT)) {
return "trainer-featureCount/" + name;
- else if(key.stringAt(0).equals(BayesConstants.LABEL_COUNT))
+ } else if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) {
return "trainer-docCount/" + name;
+ }
}
- throw new RuntimeException("Unrecognized Tuple: " + key);
+ throw new RuntimeException("Unrecognized Tuple: " + key);
}
-
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java?rev=896311&r1=896310&r2=896311&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java Wed Jan 6 02:46:22 2010
@@ -17,6 +17,9 @@
package org.apache.mahout.classifier.bayes.mapreduce.common;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
@@ -24,26 +27,24 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.common.StringTuple;
-import java.io.IOException;
-import java.util.Iterator;
-
/** Can also be used as a local Combiner. A simple summing reducer */
-public class BayesFeatureReducer extends MapReduceBase
- implements Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
-
+public class BayesFeatureReducer extends MapReduceBase implements
+ Reducer<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+
@Override
public void reduce(StringTuple key,
Iterator<DoubleWritable> values,
- OutputCollector<StringTuple, DoubleWritable> output,
+ OutputCollector<StringTuple,DoubleWritable> output,
Reporter reporter) throws IOException {
- //Key is label,word, value is the number of times we've seen this label word per local node. Output is the same
-
+ // Key is label,word, value is the number of times we've seen this label
+ // word per local node. Output is the same
+
double sum = 0.0;
while (values.hasNext()) {
- reporter.setStatus("Feature Reducer:" + key);
+ reporter.setStatus("Feature Reducer:" + key);
sum += values.next().get();
}
- reporter.setStatus("Bayes Feature Reducer: " + key + " => " + sum);
+ reporter.setStatus("Bayes Feature Reducer: " + key + " => " + sum);
output.collect(key, new DoubleWritable(sum));
}
}