You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dr...@apache.org on 2010/08/09 04:33:25 UTC

svn commit: r983504 - in /mahout/trunk/core/src: main/java/org/apache/mahout/classifier/bayes/ main/java/org/apache/mahout/classifier/bayes/common/ main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/ main/java/org/apache/mahout/classifier/bay...

Author: drew
Date: Mon Aug  9 02:33:24 2010
New Revision: 983504

URL: http://svn.apache.org/viewvc?rev=983504&view=rev
Log:
MAHOUT-442: Simple feature reduction options for Bayes classifiers (--minDf and --minSupport)

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureCombiner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeatureLabelComparator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeaturePartitioner.java
    mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapReduceTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TrainClassifier.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/common/BayesParameters.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TrainClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TrainClassifier.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TrainClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/TrainClassifier.java Mon Aug  9 02:33:24 2010
@@ -80,6 +80,14 @@ public final class TrainClassifier {
       abuilder.withName("gramSize").withMinimum(1).withMaximum(1).create()).withDescription(
       "Size of the n-gram. Default Value: 1 ").withShortName("ng").create();
     
+    Option minDfOpt = obuilder.withLongName("minDf").withRequired(false).withArgument(
+        abuilder.withName("minDf").withMinimum(1).withMaximum(1).create()).withDescription(
+        "Minimum Term Document Frequency: 1 ").withShortName("mf").create();
+    
+    Option minSupportOpt = obuilder.withLongName("minSupport").withRequired(false).withArgument(
+        abuilder.withName("minSupport").withMinimum(1).withMaximum(1).create()).withDescription(
+        "Minimum Support (Term Frequency): 1 ").withShortName("ms").create();
+    
     Option alphaOpt = obuilder.withLongName("alpha").withRequired(false).withArgument(
       abuilder.withName("a").withMinimum(1).withMaximum(1).create()).withDescription(
       "Smoothing parameter Default Value: 1.0").withShortName("a").create();
@@ -87,13 +95,17 @@ public final class TrainClassifier {
     Option typeOpt = obuilder.withLongName("classifierType").withRequired(true).withArgument(
       abuilder.withName("classifierType").withMinimum(1).withMaximum(1).create()).withDescription(
       "Type of classifier: bayes|cbayes. Default: bayes").withShortName("type").create();
+    
     Option dataSourceOpt = obuilder.withLongName("dataSource").withRequired(true).withArgument(
       abuilder.withName("dataSource").withMinimum(1).withMaximum(1).create()).withDescription(
       "Location of model: hdfs|hbase. Default Value: hdfs").withShortName("source").create();
     
+    Option skipCleanupOpt = obuilder.withLongName("skipCleanup").withRequired(false).withDescription(
+        "Skip cleanup of feature extraction output").withShortName("sc").create();
+    
     Group group = gbuilder.withName("Options").withOption(gramSizeOpt).withOption(helpOpt).withOption(
       inputDirOpt).withOption(outputOpt).withOption(typeOpt).withOption(dataSourceOpt).withOption(alphaOpt)
-        .create();
+        .withOption(minDfOpt).withOption(minSupportOpt).withOption(skipCleanupOpt).create();
     try {
       Parser parser = new Parser();
       
@@ -108,7 +120,18 @@ public final class TrainClassifier {
       String classifierType = (String) cmdLine.getValue(typeOpt);
       String dataSourceType = (String) cmdLine.getValue(dataSourceOpt);
       
-      BayesParameters params = new BayesParameters(Integer.parseInt((String) cmdLine.getValue(gramSizeOpt)));
+      BayesParameters params = new BayesParameters();
+      if (cmdLine.hasOption(gramSizeOpt)) 
+        params.setGramSize(Integer.parseInt((String) cmdLine.getValue(gramSizeOpt)));
+      
+      if (cmdLine.hasOption(minDfOpt))
+        params.setMinDF(Integer.parseInt((String) cmdLine.getValue(minDfOpt)));
+      
+      if (cmdLine.hasOption(minSupportOpt))
+        params.setMinSupport(Integer.parseInt((String) cmdLine.getValue(minSupportOpt)));
+      
+      if (cmdLine.hasOption(skipCleanupOpt))
+        params.setSkipCleanup(true);
       
       String alphaI = "1.0";
       if (cmdLine.hasOption(alphaOpt)) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/common/BayesParameters.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/common/BayesParameters.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/common/BayesParameters.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/common/BayesParameters.java Mon Aug  9 02:33:24 2010
@@ -24,8 +24,33 @@ import org.apache.mahout.common.Paramete
  */
 public class BayesParameters extends Parameters {
   
+  
+  public BayesParameters() {
+    
+  }
+  
+  /** Create BayesParameters with the specified gram size
+   *  
+   * @param gramSize
+   * @deprecated use {@link #BayesParameters()} and {@link #setGramSize(int)} instead
+   */
   public BayesParameters(int gramSize) {
-    set("gramSize", Integer.toString(gramSize));
+    this.setGramSize(gramSize);
+  }
+  
+  public void setGramSize(int gramSize) {
+    set("gramSize", Integer.toBinaryString(gramSize));
   }
   
+  public void setMinSupport(int minSupport) {
+    set("minSupport", Integer.toString(minSupport));
+  }
+  
+  public void setMinDF(int minDf) {
+    set("minDf", Integer.toString(minDf)); 
+  }
+  
+  public void setSkipCleanup(boolean b) {
+    set("skipCleanup", Boolean.toString(b));
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java Mon Aug  9 02:33:24 2010
@@ -96,8 +96,8 @@ public class BayesClassifierMapper exten
   @Override
   public void configure(JobConf job) {
     try {
-      log.info("Bayes Parameter {}", job.get("bayes.parameters"));
       Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
+      log.info("Bayes Parameter {}", params.print());
       log.info("{}", params.print());
       Algorithm algorithm;
       Datastore datastore;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java Mon Aug  9 02:33:24 2010
@@ -59,6 +59,8 @@ public class BayesDriver implements Baye
     BayesThetaNormalizerDriver normalizer = new BayesThetaNormalizerDriver();
     normalizer.runJob(input, output, params);
     
+    if (Boolean.parseBoolean(params.get("skipCleanup"))) return;
+    
     Path docCountOutPath = new Path(output, "trainer-docCount");
     HadoopUtil.overwriteOutput(docCountOutPath);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java Mon Aug  9 02:33:24 2010
@@ -59,6 +59,8 @@ public class CBayesDriver implements Bay
     CBayesThetaNormalizerDriver normalizer = new CBayesThetaNormalizerDriver();
     normalizer.runJob(input, output, params);
     
+    if (Boolean.getBoolean(params.get("skipCleanup"))) return;
+    
     Path docCountOutPath = new Path(output, "trainer-docCount");
     HadoopUtil.overwriteOutput(docCountOutPath);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java Mon Aug  9 02:33:24 2010
@@ -32,6 +32,8 @@ public final class BayesConstants {
   
   public static final String FEATURE_COUNT = "__FC"; // ,
   
+  public static final String FEATURE_TF = "__FF"; // ,
+  
   public static final String WEIGHT = "__WT";
   
   public static final String FEATURE_SET_SIZE = "__FS";

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureCombiner.java?rev=983504&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureCombiner.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureCombiner.java Mon Aug  9 02:33:24 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.bayes.mapreduce.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.StringTuple;
+
+/** Can also be used as a local Combiner. A simple summing reducer */
+public class BayesFeatureCombiner extends MapReduceBase implements
+    Reducer<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
+  
+  @Override
+  public void reduce(StringTuple key,
+                     Iterator<DoubleWritable> values,
+                     OutputCollector<StringTuple,DoubleWritable> output,
+                     Reporter reporter) throws IOException {
+    // Key is label,word, value is the number of times we've seen this label
+    // word per local node. Output is the same
+
+    double sum = 0.0;
+    while (values.hasNext()) {
+      reporter.setStatus("Feature Combiner:" + key);
+      sum += values.next().get();
+    }
+    reporter.setStatus("Bayes Feature Combiner: " + key + " => " + sum);
+    output.collect(key, new DoubleWritable(sum));
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java Mon Aug  9 02:33:24 2010
@@ -41,14 +41,15 @@ public class BayesFeatureDriver implemen
     conf.setJobName("Bayes Feature Driver running over input: " + input);
     conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
-    
+    conf.setPartitionerClass(FeaturePartitioner.class);
+    conf.setOutputKeyComparatorClass(FeatureLabelComparator.class);
     FileInputFormat.setInputPaths(conf, input);
     FileOutputFormat.setOutputPath(conf, output);
     
     conf.setMapperClass(BayesFeatureMapper.class);
     
     conf.setInputFormat(KeyValueTextInputFormat.class);
-    conf.setCombinerClass(BayesFeatureReducer.class);
+    conf.setCombinerClass(BayesFeatureCombiner.class);
     conf.setReducerClass(BayesFeatureReducer.class);
     conf.setOutputFormat(BayesFeatureOutputFormat.class);
     conf.set("io.serializations",
@@ -62,4 +63,13 @@ public class BayesFeatureDriver implemen
     JobClient.runJob(conf);
     
   }
+  
+  public static void main(String[] args) throws IOException {
+    // test harness, delete me
+    BayesFeatureDriver driver = new BayesFeatureDriver();
+    BayesParameters p = new BayesParameters(1);
+    Path input = new Path("/home/drew/mahout/bayes/20news-input");
+    Path output = new Path("/home/drew/mahout/bayes/20-news-features");
+    driver.runJob(input, output, p);
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java Mon Aug  9 02:33:24 2010
@@ -132,9 +132,11 @@ public class BayesFeatureMapper extends 
     reporter.setStatus("Bayes Feature Mapper: Document Label: " + label);
     
     // Output Document Frequency per Word per Class
-    wordList.forEachKey(new ObjectProcedure<String>() {
+    // Corpus Document Frequency (FEATURE_COUNT)
+    // Corpus Term Frequency (FEATURE_TF)
+    wordList.forEachPair(new ObjectIntProcedure<String>() {
       @Override
-      public boolean apply(String token) {
+      public boolean apply(String token, int dKJ) {
         try {
           StringTuple dfTuple = new StringTuple();
           dfTuple.add(BayesConstants.DOCUMENT_FREQUENCY);
@@ -146,6 +148,11 @@ public class BayesFeatureMapper extends 
           tokenCountTuple.add(BayesConstants.FEATURE_COUNT);
           tokenCountTuple.add(token);
           output.collect(tokenCountTuple, ONE);
+          
+          StringTuple tokenTfTuple = new StringTuple();
+          tokenTfTuple.add(BayesConstants.FEATURE_TF);
+          tokenTfTuple.add(token);
+          output.collect(tokenTfTuple, new DoubleWritable(dKJ));
         } catch (IOException e) {
           throw new IllegalStateException(e);
         }
@@ -164,8 +171,8 @@ public class BayesFeatureMapper extends 
   @Override
   public void configure(JobConf job) {
     try {
-      log.info("Bayes Parameter {}", job.get("bayes.parameters"));
       Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
+      log.info("Bayes Parameter {}", params.print());
       gramSize = Integer.valueOf(params.get("gramSize"));
       
     } catch (IOException ex) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java?rev=983504&r1=983503&r2=983504&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java Mon Aug  9 02:33:24 2010
@@ -20,31 +20,104 @@ package org.apache.mahout.classifier.bay
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.Parameters;
 import org.apache.mahout.common.StringTuple;
 
 /** Can also be used as a local Combiner. A simple summing reducer */
 public class BayesFeatureReducer extends MapReduceBase implements
     Reducer<StringTuple,DoubleWritable,StringTuple,DoubleWritable> {
   
+  private static final Logger log = LoggerFactory.getLogger(BayesFeatureReducer.class);
+  
+  private static final String DEFAULT_MIN_SUPPORT = "-1";
+  private static final String DEFAULT_MIN_DF = "-1";
+  
+  private double minSupport = -1;  
+  private double minDf      = -1;
+  
+  private String currentDfFeature;
+  private double currentCorpusDf;
+  private double currentCorpusTf;
+  
   @Override
   public void reduce(StringTuple key,
                      Iterator<DoubleWritable> values,
                      OutputCollector<StringTuple,DoubleWritable> output,
                      Reporter reporter) throws IOException {
-    // Key is label,word, value is the number of times we've seen this label
-    // word per local node. Output is the same
+    
+    // StringTuple key is either:
+    // type, word        for type=FEATURE_COUNT, FEATURE_TF or WEIGHT tuples
+    // type, label       for type=LABEL_COUNT_TUPLES
+    // type, label, word for type=DOCUMENT_FREQUENCY tuples
     
     double sum = 0.0;
-    while (values.hasNext()) {
+    while (values.hasNext()) {  
       reporter.setStatus("Feature Reducer:" + key);
       sum += values.next().get();
     }
     reporter.setStatus("Bayes Feature Reducer: " + key + " => " + sum);
+
+    if (2 > key.length() || key.length() > 3) {
+      throw new IllegalArgumentException("StringTuple length out of bounds, not (2 < length < 3)");
+    }
+    
+    int featureIndex = key.length() == 2 ? 1 : 2;
+    
+    // FeatureLabelComparator guarantees that for a given label, we will
+    // see FEATURE_TF items first, FEATURE_COUNT items second, 
+    // DOCUMENT_FREQUENCY items next and finally WEIGHT items, while
+    // the FeaturePartitioner guarantees that all tuples containing a given term
+    // will be handled by the same reducer.
+    if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) {
+      /* no-op, just collect */
+    } else if (key.stringAt(0).equals(BayesConstants.FEATURE_TF)) {
+      currentDfFeature = key.stringAt(1);
+      currentCorpusTf = sum;
+      currentCorpusDf = -1;
+      
+      if (0 < minSupport && currentCorpusTf < minSupport) {
+        reporter.incrCounter("skipped", "less_than_minSupport", 1);
+      }
+      return; // never emit FEATURE_TF tuples.
+    } else if (!key.stringAt(featureIndex).equals(currentDfFeature)) {
+      throw new IllegalStateException("Found feature data " + key + " prior to feature tf");
+    } else if (0 < minSupport && currentCorpusTf < minSupport) {
+      reporter.incrCounter("skipped", "less_than_minSupport_label-term", 1);
+      return; // skip items that have less than a specified frequency.
+    } else if (key.stringAt(0).equals(BayesConstants.FEATURE_COUNT)) {
+      currentCorpusDf = sum;
+      
+      if (0 < minDf && currentCorpusDf < minDf) {
+        reporter.incrCounter("skipped", "less_than_minDf", 1);
+        return; // skip items that have less than the specified minSupport.
+      }
+    } else if (currentCorpusDf == -1) {
+      throw new IllegalStateException("Found feature data " + key + " prior to feature count");
+    } else if (0 < minDf && currentCorpusDf < minDf) {
+      reporter.incrCounter("skipped", "less_than_minDf_label-term", 1);
+      return; // skip items that have less than a specified frequency.
+    } 
     output.collect(key, new DoubleWritable(sum));
   }
+
+  @Override
+  public void configure(JobConf job) {
+    try {
+      Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));
+      log.info("Bayes Parameter {}", params.print());
+      minSupport = Integer.valueOf(params.get("minSupport", DEFAULT_MIN_SUPPORT));
+      minDf      = Integer.valueOf(params.get("minDf", DEFAULT_MIN_DF));
+    } catch (IOException ex) {
+      log.warn(ex.toString(), ex);
+    };
+  }
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeatureLabelComparator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeatureLabelComparator.java?rev=983504&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeatureLabelComparator.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeatureLabelComparator.java Mon Aug  9 02:33:24 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.classifier.bayes.mapreduce.common;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.mahout.common.StringTuple;
+
+/**
+ * 
+ */
+public class FeatureLabelComparator extends WritableComparator {
+  
+  /**
+   * @param keyClass
+   */
+  public FeatureLabelComparator() {
+    super(StringTuple.class, true);
+  }
+  
+  @Override
+  public int compare(WritableComparable a, WritableComparable b) {
+    StringTuple ta = (StringTuple) a;
+    StringTuple tb = (StringTuple) b;
+    
+    String tmpa, tmpb;
+    int cmp;
+    
+    if (ta.length() < 2 || ta.length() > 3 || tb.length() < 2
+        || tb.length() > 3) {
+      throw new IllegalArgumentException("StringTuple length out of bounds");
+    }
+    
+    // token
+    tmpa = ta.length() == 2 ? ta.stringAt(1) : ta.stringAt(2);
+    tmpb = tb.length() == 2 ? tb.stringAt(1) : tb.stringAt(2);
+    cmp = tmpa.compareTo(tmpb);
+    if (cmp != 0) return cmp;
+    
+    // type, FEATURE_TF first, then FEATURE_COUNT, then DF or anything else.
+    cmp = ta.stringAt(0).compareTo(tb.stringAt(0));
+    if (cmp != 0) {
+      if (ta.stringAt(0).equals(BayesConstants.FEATURE_TF)) {
+        return -1;
+      }
+      else if (tb.stringAt(0).equals(BayesConstants.FEATURE_TF)) {
+        return 1;
+      }
+      else if (ta.stringAt(0).equals(BayesConstants.FEATURE_COUNT)) {
+        return -1;
+      }
+      else if (tb.stringAt(0).equals(BayesConstants.FEATURE_COUNT)) {
+        return 1;
+      }
+      else {
+        return cmp;
+      }
+    }
+
+    // label or empty.
+    tmpa = ta.length() == 2 ? "" : ta.stringAt(1);
+    tmpb = tb.length() == 2 ? "" : tb.stringAt(1);
+    
+    cmp = tmpa.compareTo(tmpb);
+    return cmp;
+    
+  }
+  
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeaturePartitioner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeaturePartitioner.java?rev=983504&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeaturePartitioner.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/FeaturePartitioner.java Mon Aug  9 02:33:24 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.classifier.bayes.mapreduce.common;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.mahout.common.StringTuple;
+
+/**
+ * ensure that features all make it into the same partition.
+ */
+public class FeaturePartitioner implements
+    Partitioner<StringTuple,DoubleWritable> {
+  
+  /** {@inheritDoc} */
+  @Override
+  public int getPartition(StringTuple key, DoubleWritable value,
+      int numPartitions) {
+    
+    if (key.length() < 2 || key.length() > 3) {
+      throw new IllegalArgumentException("StringTuple length out of bounds");
+    }
+    
+    String feature = key.length() == 2 ? key.stringAt(1) : key.stringAt(2);
+    
+    int length = feature.length();
+    int right = 0;
+    if (length > 0) {
+      right = (3 + length) % length;
+    }
+    int hash = WritableComparator.hashBytes(feature.getBytes(), right);
+    return (hash & Integer.MAX_VALUE) % numPartitions;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void configure(JobConf job) {
+  /* no-op */
+  }
+  
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapReduceTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapReduceTest.java?rev=983504&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapReduceTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapReduceTest.java Mon Aug  9 02:33:24 2010
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.classifier.bayes;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.classifier.bayes.common.BayesParameters;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureMapper;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesFeatureReducer;
+import org.apache.mahout.classifier.bayes.mapreduce.common.FeatureLabelComparator;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.StringTuple;
+
+/**
+ * 
+ */
+public class BayesFeatureMapReduceTest extends MahoutTestCase {
+
+  public DummyOutputCollector<StringTuple,DoubleWritable> runMapReduce(BayesParameters bp) throws IOException {
+    
+    BayesFeatureMapper mapper = new BayesFeatureMapper();
+    JobConf conf = new JobConf();
+    conf.set("io.serializations",
+      "org.apache.hadoop.io.serializer.JavaSerialization,"
+          + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    conf.set("bayes.parameters", bp.toString());
+    mapper.configure(conf);
+    
+    DummyOutputCollector<StringTuple,DoubleWritable> mapperOutput = new DummyOutputCollector<StringTuple,DoubleWritable>();
+    
+    mapper.map(new Text("foo"), new Text("big brown shoe"), mapperOutput, Reporter.NULL);
+    mapper.map(new Text("foo"), new Text("cool chuck taylors"), mapperOutput, Reporter.NULL);
+    
+    mapper.map(new Text("bar"), new Text("big big dog"), mapperOutput, Reporter.NULL);
+    mapper.map(new Text("bar"), new Text("cool rain"), mapperOutput, Reporter.NULL);
+   
+    mapper.map(new Text("baz"), new Text("red giant"), mapperOutput, Reporter.NULL);
+    mapper.map(new Text("baz"), new Text("white dwarf"), mapperOutput, Reporter.NULL);
+    mapper.map(new Text("baz"), new Text("cool black hole"), mapperOutput, Reporter.NULL);
+    
+    BayesFeatureReducer reducer = new BayesFeatureReducer();
+    reducer.configure(conf);
+    
+    DummyOutputCollector<StringTuple,DoubleWritable> reducerOutput = new DummyOutputCollector<StringTuple,DoubleWritable>();
+    Map<StringTuple, List<DoubleWritable>> outputData = mapperOutput.getData();
+    
+    // put the mapper output in the expected order (emulate shuffle)
+    FeatureLabelComparator cmp = new FeatureLabelComparator();
+    TreeSet<StringTuple> keySet = new TreeSet<StringTuple>(cmp);
+    keySet.addAll(mapperOutput.getKeys());
+    
+    for (StringTuple k: keySet) {
+      List<DoubleWritable> v = outputData.get(k);
+      reducer.reduce(k, v.iterator(), reducerOutput, Reporter.NULL);
+    }
+
+    return reducerOutput;
+  }
+
+  public void testNoFilters() throws IOException {
+    BayesParameters bp = new BayesParameters();
+    bp.setGramSize(1);
+    bp.setMinDF(1);
+    DummyOutputCollector<StringTuple,DoubleWritable> reduceOutput = runMapReduce(bp);
+
+    assertCounts(reduceOutput, 
+        17, /* df: 13 unique term/label pairs */
+        14, /* fc: 12 unique features across all labels */
+        3,  /* lc: 3 labels */
+        17  /* wt: 13 unique term/label pairs */);
+  }
+  
+  public void testMinSupport() throws IOException {
+    BayesParameters bp = new BayesParameters();
+    bp.setGramSize(1);
+    bp.setMinSupport(2);
+    DummyOutputCollector<StringTuple,DoubleWritable> reduceOutput = runMapReduce(bp);
+    
+    assertCounts(reduceOutput, 
+        5, /* df: 5 unique term/label pairs */
+        2, /* fc: 'big' and 'cool' appears more than 2 times */
+        3, /* lc: 3 labels */
+        5  /* wt: 5 unique term/label pairs */);
+    
+  }
+  
+  public void testMinDf() throws IOException {
+    BayesParameters bp = new BayesParameters();
+    bp.setGramSize(1);
+    bp.setMinDF(2);
+    DummyOutputCollector<StringTuple,DoubleWritable> reduceOutput = runMapReduce(bp);
+    
+    // 13 unique term/label pairs. 3 labels
+    // should be a df and fc for each pair, no filtering
+    assertCounts(reduceOutput, 
+        5, /* df: 5 term/label pairs contains terms in more than 2 document */
+        2, /* fc */
+        3,  /* lc */
+        5  /* wt */);
+    
+  }
+  
+  public void testMinBoth() throws IOException {
+    BayesParameters bp = new BayesParameters();
+    bp.setGramSize(1);
+    bp.setMinSupport(3);
+    bp.setMinDF(2);
+    DummyOutputCollector<StringTuple,DoubleWritable> reduceOutput = runMapReduce(bp);
+    
+    // 13 unique term/label pairs. 3 labels
+    // should be a df and fc for each pair, no filtering
+    assertCounts(reduceOutput, 
+        5, /* df: 5 term/label pairs contains terms in more than 2 document */
+        2, /* fc: 'cool' appears 3 times */
+        3,  /* lc */
+        5  /* wt */);
+  }
+  
+  public void assertCounts(DummyOutputCollector<StringTuple,DoubleWritable> output, 
+      int dfExpected, int fcExpected, int lcExpected, int wtExpected) {
+    int dfCount = 0;
+    int fcCount = 0;
+    int lcCount = 0;
+    int wtCount = 0;
+    
+    Map<StringTuple, List<DoubleWritable>> outputData = output.getData();
+    for (Map.Entry<StringTuple, List<DoubleWritable>> entry: outputData.entrySet()) {
+      //System.err.println(entry.getKey() + "\t" + entry.getValue());
+      String type = entry.getKey().stringAt(0);
+      if (type.equals(BayesConstants.DOCUMENT_FREQUENCY)) 
+        dfCount++;
+      else if (type.equals(BayesConstants.FEATURE_COUNT))
+        fcCount++;
+      else if (type.equals(BayesConstants.LABEL_COUNT))
+        lcCount++;
+      else if (type.equals(BayesConstants.WEIGHT))
+        wtCount++;
+        
+      assertEquals("value size", 1, entry.getValue().size());
+      
+    }
+    
+    assertEquals("document frequency count", dfExpected, dfCount);
+    assertEquals("feature count", fcExpected, fcCount);
+    assertEquals("label count", lcExpected, lcCount);
+    assertEquals("feature weight count", wtExpected, wtCount);
+  }
+}