You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2011/10/07 16:02:23 UTC

svn commit: r1180043 [1/3] - in /mahout/trunk: bin/ buildtools/ core/ core/src/main/java/org/apache/mahout/classifier/naivebayes/ core/src/main/java/org/apache/mahout/classifier/naivebayes/test/ core/src/main/java/org/apache/mahout/classifier/naivebaye...

Author: gsingers
Date: Fri Oct  7 14:02:20 2011
New Revision: 1180043

URL: http://svn.apache.org/viewvc?rev=1180043&view=rev
Log:
MAHOUT-798: add in examples for working with ASF email archive, plus various refactorings to clusterdumper, etc. for viewing results

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java
    mahout/trunk/examples/bin/build-asf-email.sh
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/
    mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java
    mahout/trunk/examples/src/test/java/org/apache/mahout/cf/
    mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/
    mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/
    mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/
    mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/AbstractClusterWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/CSVClusterWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/ClusterDumperWriter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/ClusterWriter.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/utils/email/
    mahout/trunk/integration/src/test/java/org/apache/mahout/utils/email/MailProcessorTest.java
    mahout/trunk/integration/src/test/resources/
    mahout/trunk/integration/src/test/resources/test.mbox
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainUtils.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/bayes/SplitBayesInput.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
Modified:
    mahout/trunk/bin/mahout
    mahout/trunk/buildtools/   (props changed)
    mahout/trunk/core/   (props changed)
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
    mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java
    mahout/trunk/distribution/   (props changed)
    mahout/trunk/examples/   (props changed)
    mahout/trunk/examples/src/test/java/org/apache/mahout/classifier/bayes/SplitBayesInputTest.java
    mahout/trunk/integration/bin/prep_asf_mail_archives.sh
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
    mahout/trunk/src/conf/driver.classes.props

Modified: mahout/trunk/bin/mahout
URL: http://svn.apache.org/viewvc/mahout/trunk/bin/mahout?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/bin/mahout (original)
+++ mahout/trunk/bin/mahout Fri Oct  7 14:02:20 2011
@@ -105,15 +105,7 @@ if [ "$MAHOUT_HEAPSIZE" != "" ]; then
 fi
 
 if [ "x$MAHOUT_CONF_DIR" = "x" ]; then
-  if [ -d $MAHOUT_HOME/src/conf ]; then
-    MAHOUT_CONF_DIR=$MAHOUT_HOME/src/conf
-  else
-    if [ -d $MAHOUT_HOME/conf ]; then
-      MAHOUT_CONF_DIR=$MAHOUT_HOME/conf
-    else
-      echo No MAHOUT_CONF_DIR found
-    fi
-  fi
+  MAHOUT_CONF_DIR=$MAHOUT_HOME/src/conf
 fi
 
 # CLASSPATH initially contains $MAHOUT_CONF_DIR, or defaults to $MAHOUT_HOME/src/conf
@@ -205,13 +197,8 @@ if [ "$HADOOP_HOME" = "" ] || [ "$MAHOUT
   elif [ "$MAHOUT_LOCAL" != "" ] ; then
     echo "MAHOUT_LOCAL is set, running locally"
   fi
-    case $1 in
-    (classpath)
-      echo $CLASSPATH
-      ;;
-    (*)
-      CLASSPATH=$CLASSPATH exec "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS $CLASS "$@"
-    esac
+#  echo "CLASSPATH: $CLASSPATH"
+  exec "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS -classpath "$CLASSPATH" $CLASS "$@"
 else
   echo "Running on hadoop, using HADOOP_HOME=$HADOOP_HOME"
   if [ "$HADOOP_CONF_DIR" = "" ] ; then
@@ -225,18 +212,13 @@ else
     echo "ERROR: Could not find mahout-examples-*.job in $MAHOUT_HOME or $MAHOUT_HOME/examples/target, please run 'mvn install' to create the .job file"
     exit 1
   else
-    case "$1" in
-    (hadoop)
+    if [ "$1" = "hadoop" ]; then
       export HADOOP_CLASSPATH=$MAHOUT_CONF_DIR:${HADOOP_CLASSPATH}:$CLASSPATH
       exec "$HADOOP_HOME/bin/$@"
-      ;;
-    (classpath)
-      echo $CLASSPATH
-      ;;
-    (*)
+    else
       echo "MAHOUT-JOB: $MAHOUT_JOB"
       export HADOOP_CLASSPATH=$MAHOUT_CONF_DIR:${HADOOP_CLASSPATH}
       exec "$HADOOP_HOME/bin/hadoop" --config $HADOOP_CONF_DIR jar $MAHOUT_JOB $CLASS "$@"
-    esac
+    fi
   fi
 fi

Propchange: mahout/trunk/buildtools/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct  7 14:02:20 2011
@@ -1 +1,2 @@
 target
+*.iml

Propchange: mahout/trunk/core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct  7 14:02:20 2011
@@ -11,3 +11,5 @@ testdatatestdata
 target
 .checkstyle
 dist
+temp
+testdata

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java Fri Oct  7 14:02:20 2011
@@ -24,7 +24,7 @@ import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 
 /** Class implementing the Naive Bayes Classifier Algorithm */
-abstract class AbstractNaiveBayesClassifier extends AbstractVectorClassifier {
+public abstract class AbstractNaiveBayesClassifier extends AbstractVectorClassifier {
 
   private final NaiveBayesModel model;
   

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.naivebayes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.training.ThetaMapper;
+import org.apache.mahout.classifier.naivebayes.training.TrainNaiveBayesJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.SparseMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class BayesUtils {
+
+  private BayesUtils() {}
+
+
+  public static NaiveBayesModel readModelFromDir(Path base, Configuration conf) {
+
+    float alphaI = conf.getFloat(ThetaMapper.ALPHA_I, 1.0f);
+
+    // read feature sums and label sums
+    Vector scoresPerLabel = null;
+    Vector scoresPerFeature = null;
+    for (Pair<Text,VectorWritable> record : new SequenceFileDirIterable<Text, VectorWritable>(
+        new Path(base, TrainNaiveBayesJob.WEIGHTS), PathType.LIST, PathFilters.partFilter(), conf)) {
+      String key = record.getFirst().toString();
+      VectorWritable value = record.getSecond();
+      if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE)) {
+        scoresPerFeature = value.get();
+      } else if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_LABEL)) {
+        scoresPerLabel = value.get();
+      }
+    }
+
+    Preconditions.checkNotNull(scoresPerFeature);
+    Preconditions.checkNotNull(scoresPerLabel);
+
+    Matrix scoresPerLabelAndFeature = new SparseMatrix(scoresPerLabel.size(), scoresPerFeature.size());
+    for (Pair<IntWritable,VectorWritable> entry : new SequenceFileDirIterable<IntWritable,VectorWritable>(
+        new Path(base, TrainNaiveBayesJob.SUMMED_OBSERVATIONS), PathType.LIST, PathFilters.partFilter(), conf)) {
+      scoresPerLabelAndFeature.assignRow(entry.getFirst().get(), entry.getSecond().get());
+    }
+
+    Vector perlabelThetaNormalizer = null;
+    for (Pair<Text,VectorWritable> entry : new SequenceFileDirIterable<Text,VectorWritable>(
+        new Path(base, TrainNaiveBayesJob.THETAS), PathType.LIST, PathFilters.partFilter(), conf)) {
+      if (entry.getFirst().toString().equals(TrainNaiveBayesJob.LABEL_THETA_NORMALIZER)) {
+        perlabelThetaNormalizer = entry.getSecond().get();
+      }
+    }
+
+    Preconditions.checkNotNull(perlabelThetaNormalizer);
+
+    return new NaiveBayesModel(scoresPerLabelAndFeature, scoresPerFeature, scoresPerLabel, perlabelThetaNormalizer,
+        alphaI);
+  }
+
+  /** Write the list of labels into a map file */
+  public static int writeLabelIndex(Configuration conf, Iterable<String> labels, Path indexPath)
+      throws IOException {
+    FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);
+    int i = 0;
+    try {
+      for (String label : labels) {
+        writer.append(new Text(label), new IntWritable(i++));
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+    return i;
+  }
+
+  public static int writeLabelIndex(Configuration conf, Path indexPath, SequenceFileDirIterable labels) throws IOException {
+    FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);
+    Set<String> seen = new HashSet<String>();
+    int i = 0;
+    try {
+      for (Object label : labels) {
+        String theLabel = ((Pair) label).getFirst().toString();
+        if (seen.contains(theLabel) == false){
+          writer.append(new Text(theLabel), new IntWritable(i++));
+          seen.add(theLabel);
+        }
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+    return i;
+  }
+
+  public static Map<Integer, String> readLabelIndex(Configuration conf, Path indexPath) throws IOException {
+    Map<Integer, String> labelMap = new HashMap<Integer, String>();
+    SequenceFileIterable<Text, IntWritable> fileIterable = new SequenceFileIterable<Text, IntWritable>(indexPath, true, conf);
+    for (Pair<Text, IntWritable> pair : fileIterable) {
+      labelMap.put(pair.getSecond().get(), pair.getFirst().toString());
+    }
+    return labelMap;
+  }
+
+  public static OpenObjectIntHashMap<String> readIndexFromCache(Configuration conf) throws IOException {
+    OpenObjectIntHashMap<String> index = new OpenObjectIntHashMap<String>();
+    for (Pair<Writable,IntWritable> entry : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.cachedFile(conf), conf)) {
+      index.put(entry.getFirst().toString(), entry.getSecond().get());
+    }
+    return index;
+  }
+
+  public static Map<String,Vector> readScoresFromCache(Configuration conf) throws IOException {
+    Map<String,Vector> sumVectors = Maps.newHashMap();
+    for (Pair<Text,VectorWritable> entry : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.cachedFile(conf),
+        PathType.LIST, PathFilters.partFilter(), conf)) {
+      sumVectors.put(entry.getFirst().toString(), entry.getSecond().get());
+    }
+    return sumVectors;
+  }
+
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,62 @@
+package org.apache.mahout.classifier.naivebayes.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.AbstractNaiveBayesClassifier;
+import org.apache.mahout.classifier.naivebayes.ComplementaryNaiveBayesClassifier;
+import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.StandardNaiveBayesClassifier;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Run the input through the model and see if it matches.
+ * <p/>
+ * The output value is the generated label, the Pair is the expected label and true if they match:
+ */
+public class BayesTestMapper extends Mapper<Text, VectorWritable, Text, VectorWritable> {
+  AbstractNaiveBayesClassifier classifier;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Path modelPath = HadoopUtil.cachedFile(conf);
+    NaiveBayesModel model = NaiveBayesModel.materialize(modelPath, conf);
+    boolean compl = Boolean.parseBoolean(conf.get(TestNaiveBayesDriver.COMPLEMENTARY));
+    if (compl) {
+      classifier = new ComplementaryNaiveBayesClassifier(model);
+    } else {
+      classifier = new StandardNaiveBayesClassifier(model);
+    }
+  }
+
+  @Override
+  protected void map(Text key, VectorWritable value, Context context) throws IOException, InterruptedException {
+    Vector result = classifier.classify(value.get());
+    //the key is the expected value
+    context.write(key, new VectorWritable(result));
+
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,130 @@
+package org.apache.mahout.classifier.naivebayes.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.classifier.ClassifierResult;
+import org.apache.mahout.classifier.ResultAnalyzer;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Test the (Complementary) Naive Bayes model that was built during training
+ * by running the iterating the test set and comparing it to the model
+ */
+public class TestNaiveBayesDriver extends AbstractJob {
+  private transient static Logger log = LoggerFactory.getLogger(TestNaiveBayesDriver.class);
+  public static final String LABEL_KEY = "labels";
+  public static final String COMPLEMENTARY = "class"; //b for bayes, c for complementary
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TestNaiveBayesDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int result = 0;
+    addInputOption();
+    addOutputOption();
+    addOption(addOption(DefaultOptionCreator.overwriteOption().create()));
+    addOption("model", "m", "The path to the model built during training", true);
+    addOption(buildOption("testComplementary", "c", "test complementary?", false, false, String.valueOf(false)));
+    addOption("labelIndex", "l", "The path to the location of the label index", true);
+    Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), getOutputPath());
+    }
+    Path model = new Path(parsedArgs.get("--model"));
+    HadoopUtil.cacheFiles(model, getConf());
+    //the output key is the expected value, the output value are the scores for all the labels
+    Job testJob = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class, BayesTestMapper.class,
+            Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
+    //testJob.getConfiguration().set(LABEL_KEY, parsedArgs.get("--labels"));
+    boolean complementary = parsedArgs.containsKey("--testComplementary");
+    testJob.getConfiguration().set(COMPLEMENTARY, String.valueOf(complementary));
+    testJob.waitForCompletion(true);
+    //load the labels
+    Map<Integer, String> labelMap = BayesUtils.readLabelIndex(getConf(), new Path(parsedArgs.get("--labelIndex")));
+
+    //loop over the results and create the confusion matrix
+    SequenceFileDirIterable<Text, VectorWritable> dirIterable = new SequenceFileDirIterable<Text, VectorWritable>(getOutputPath(), PathType.LIST, PathFilters.partFilter(), getConf());
+    ResultAnalyzer analyzer = new ResultAnalyzer(labelMap.values(), "DEFAULT");
+    analyzeResults(labelMap, dirIterable, analyzer, complementary);
+
+    log.info((complementary ? "Complementary" : "Standard NB") + " Results: {}", analyzer);
+    return result;
+  }
+
+  private void analyzeResults(Map<Integer, String> labelMap, SequenceFileDirIterable<Text, VectorWritable> dirIterable, ResultAnalyzer analyzer, boolean complementary) {
+    double bestScore;
+    int bestIdx;
+    if (complementary){
+      for (Pair<Text, VectorWritable> pair : dirIterable) {
+        bestIdx = Integer.MIN_VALUE;
+        bestScore = Long.MIN_VALUE;
+        for (Vector.Element element : pair.getSecond().get()) {
+          if (element.get() > bestScore) {
+            bestScore = element.get();
+            bestIdx = element.index();
+          }
+        }
+        if (bestIdx != Integer.MAX_VALUE) {
+          ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore);
+          analyzer.addInstance(pair.getFirst().toString(), classifierResult);
+        }
+      }
+    } else {
+      for (Pair<Text, VectorWritable> pair : dirIterable) {
+        bestIdx = Integer.MIN_VALUE;
+        bestScore = Long.MIN_VALUE;
+        for (Vector.Element element : pair.getSecond().get()) {
+          if (element.get() > bestScore) {
+            bestScore = element.get();
+            bestIdx = element.index();
+          }
+        }
+        if (bestIdx != Integer.MIN_VALUE) {
+          ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore);
+          analyzer.addInstance(pair.getFirst().toString(), classifierResult);
+        }
+      }
+    }
+
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java Fri Oct  7 14:02:20 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
 
@@ -33,7 +34,7 @@ public class IndexInstancesMapper extend
 
   @Override
   protected void setup(Context ctx) throws IOException, InterruptedException {
-    labelIndex = TrainUtils.readIndexFromCache(ctx.getConfiguration());
+    labelIndex = BayesUtils.readIndexFromCache(ctx.getConfiguration());
   }
 
   @Override

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java Fri Oct  7 14:02:20 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
@@ -40,7 +41,7 @@ public class ThetaMapper extends Mapper<
 
     float alphaI = conf.getFloat(ALPHA_I, 1.0f);
     boolean trainComplemenary = conf.getBoolean(TRAIN_COMPLEMENTARY, false);
-    Map<String,Vector> scores = TrainUtils.readScoresFromCache(conf);
+    Map<String,Vector> scores = BayesUtils.readScoresFromCache(conf);
 
     if (!trainComplemenary) {
       trainer = new StandardThetaTrainer(scores.get(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE),

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java Fri Oct  7 14:02:20 2011
@@ -17,17 +17,26 @@
 
 package org.apache.mahout.classifier.naivebayes.training;
 
+import java.io.IOException;
 import java.util.Map;
 
 import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
 import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
 import org.apache.mahout.common.mapreduce.VectorSumReducer;
 import org.apache.mahout.math.VectorWritable;
 
@@ -42,27 +51,44 @@ public final class TrainNaiveBayesJob ex
   public static final String WEIGHTS = "weights";
   public static final String THETAS = "thetas";
 
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TrainNaiveBayesJob(), args);
+  }
+
   @Override
   public int run(String[] args) throws Exception {
 
     addInputOption();
     addOutputOption();
-    addOption("labels", "l", "comma-separated list of labels to include in training", true);
-    addOption("alphaI", "a", "smoothing parameter", String.valueOf(1.0f));
-    addOption("trainComplementary", "c", "train complementary?", String.valueOf(false));
+    addOption("labels", "l", "comma-separated list of labels to include in training", false);
 
+    addOption(buildOption("extractLabels", "el", "Extract the labels from the input", false, false, ""));
+    addOption("alphaI", "a", "smoothing parameter", String.valueOf(1.0f));
+    addOption(buildOption("trainComplementary", "c", "train complementary?", false, false, String.valueOf(false)));
+    addOption("labelIndex", "li", "The path to store the label index in", false);
+    addOption(DefaultOptionCreator.overwriteOption().create());
     Map<String,String> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
       return -1;
     }
-
-    Iterable<String> labels = Splitter.on(",").split(parsedArgs.get("--labels"));
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), getOutputPath());
+      HadoopUtil.delete(getConf(), getTempPath());
+    }
+    Path labPath;
+    String labPathStr = parsedArgs.get("--labelIndex");
+    if (labPathStr != null){
+      labPath = new Path(labPathStr);
+    } else {
+      labPath = getTempPath("labelIndex");
+    }
+    long labelSize = createLabelIndex(parsedArgs, labPath);
     float alphaI = Float.parseFloat(parsedArgs.get("--alphaI"));
     boolean trainComplementary = Boolean.parseBoolean(parsedArgs.get("--trainComplementary"));
 
-    TrainUtils.writeLabelIndex(getConf(), labels, getTempPath("labelIndex"));
-    TrainUtils.setSerializations(getConf());
-    TrainUtils.cacheFiles(getTempPath("labelIndex"), getConf());
+
+    HadoopUtil.setSerializations(getConf());
+    HadoopUtil.cacheFiles(labPath, getConf());
 
     Job indexInstances = prepareJob(getInputPath(), getTempPath(SUMMED_OBSERVATIONS), SequenceFileInputFormat.class,
         IndexInstancesMapper.class, IntWritable.class, VectorWritable.class, VectorSumReducer.class, IntWritable.class,
@@ -73,11 +99,11 @@ public final class TrainNaiveBayesJob ex
     Job weightSummer = prepareJob(getTempPath(SUMMED_OBSERVATIONS), getTempPath(WEIGHTS),
         SequenceFileInputFormat.class, WeightsMapper.class, Text.class, VectorWritable.class, VectorSumReducer.class,
         Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
-    weightSummer.getConfiguration().set(WeightsMapper.NUM_LABELS, String.valueOf(Iterables.size(labels)));
+    weightSummer.getConfiguration().set(WeightsMapper.NUM_LABELS, String.valueOf(labelSize));
     weightSummer.setCombinerClass(VectorSumReducer.class);
     weightSummer.waitForCompletion(true);
 
-    TrainUtils.cacheFiles(getTempPath(WEIGHTS), getConf());
+    HadoopUtil.cacheFiles(getTempPath(WEIGHTS), getConf());
 
     Job thetaSummer = prepareJob(getTempPath(SUMMED_OBSERVATIONS), getTempPath(THETAS),
         SequenceFileInputFormat.class, ThetaMapper.class, Text.class, VectorWritable.class, VectorSumReducer.class,
@@ -87,11 +113,24 @@ public final class TrainNaiveBayesJob ex
     thetaSummer.getConfiguration().setBoolean(ThetaMapper.TRAIN_COMPLEMENTARY, trainComplementary);
     thetaSummer.waitForCompletion(true);
 
-    NaiveBayesModel naiveBayesModel = TrainUtils.readModelFromTempDir(getTempPath(), getConf());
+    NaiveBayesModel naiveBayesModel = BayesUtils.readModelFromDir(getTempPath(), getConf());
     naiveBayesModel.validate();
     naiveBayesModel.serialize(getOutputPath(), getConf());
 
     return 0;
   }
 
+  private long createLabelIndex(Map<String, String> parsedArgs, Path labPath) throws IOException {
+    long labelSize = 0;
+    if (parsedArgs.containsKey("--labels")){
+      Iterable<String> labels;
+      labels = Splitter.on(",").split(parsedArgs.get("--labels"));
+      labelSize = BayesUtils.writeLabelIndex(getConf(), labels, labPath);
+    } else if (parsedArgs.containsKey("--extractLabels")){
+      SequenceFileDirIterable iterable = new SequenceFileDirIterable(getInputPath(), PathType.LIST, PathFilters.logsCRCFilter(), getConf());
+      labelSize = BayesUtils.writeLabelIndex(getConf(), labPath, iterable);
+    }
+    return labelSize;
+  }
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Oct  7 14:02:20 2011
@@ -422,7 +422,8 @@ public abstract class AbstractJob extend
     job.setMapperClass(mapper);
     job.setMapOutputKeyClass(mapperKey);
     job.setMapOutputValueClass(mapperValue);
-
+    job.setOutputKeyClass(mapperKey);
+    job.setOutputValueClass(mapperValue);
     jobConf.setBoolean("mapred.compress.map.output", true);
     job.setNumReduceTasks(0);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Fri Oct  7 14:02:20 2011
@@ -19,10 +19,14 @@ package org.apache.mahout.common;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -34,9 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class HadoopUtil {
-  
+
   private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);
-  
+
   private HadoopUtil() { }
 
   public static void delete(Configuration conf, Iterable<Path> paths) throws IOException {
@@ -51,7 +55,7 @@ public final class HadoopUtil {
       }
     }
   }
-  
+
   public static void delete(Configuration conf, Path... paths) throws IOException {
     delete(conf, Arrays.asList(paths));
   }
@@ -90,4 +94,30 @@ public final class HadoopUtil {
     return fs.open(path.makeQualified(fs));
   }
 
+  public static FileStatus[] getFileStatus(Path path, PathType pathType, PathFilter filter, Comparator<FileStatus> ordering, Configuration conf) throws IOException {
+    FileStatus[] statuses;
+    FileSystem fs = path.getFileSystem(conf);
+    if (filter == null) {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+    } else {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+    }
+    if (ordering != null) {
+      Arrays.sort(statuses, ordering);
+    }
+    return statuses;
+  }
+
+  public static void cacheFiles(Path fileToCache, Configuration conf) {
+    DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);
+  }
+
+  public static Path cachedFile(Configuration conf) throws IOException {
+    return new Path(DistributedCache.getCacheFiles(conf)[0].getPath());
+  }
+
+  public static void setSerializations(Configuration conf) {
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java Fri Oct  7 14:02:20 2011
@@ -19,7 +19,6 @@ package org.apache.mahout.common.iterato
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -31,10 +30,10 @@ import com.google.common.collect.Iterato
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.Pair;
 
@@ -57,16 +56,7 @@ public final class SequenceFileDirIterat
                                  final Configuration conf) throws IOException {
 
 
-    FileStatus[] statuses;
-    FileSystem fs = path.getFileSystem(conf);
-    if (filter == null) {
-      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
-    } else {
-      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
-    }
-    if (ordering != null) {
-      Arrays.sort(statuses, ordering);
-    }
+    FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
     Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
 
     iterators = Lists.newArrayList();

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java Fri Oct  7 14:02:20 2011
@@ -188,7 +188,7 @@ public final class MahoutDriver {
     programDriver.driver(argsList.toArray(new String[argsList.size()]));
 
     if (log.isInfoEnabled()) {
-      log.info("Program took {} ms", System.currentTimeMillis() - start);
+      log.info("Program took {} ms (Minutes: {})", System.currentTimeMillis() - start, (System.currentTimeMillis() - start)/60000);
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java Fri Oct  7 14:02:20 2011
@@ -54,8 +54,7 @@ import com.google.common.io.Closeables;
  * 
  */
 @SuppressWarnings("deprecation")
-public class QRFirstStep implements Closeable,
-    OutputCollector<Writable, Vector> {
+public class QRFirstStep implements Closeable, OutputCollector<Writable, Vector> {
 
   public static final String PROP_K = "ssvd.k";
   public static final String PROP_P = "ssvd.p";
@@ -259,10 +258,11 @@ public class QRFirstStep implements Clos
       // good enough,
       // then at least it is always sequential.
       String taskTmpDir = System.getProperty("java.io.tmpdir");
+
       FileSystem localFs = FileSystem.getLocal(jobConf);
-      tempQPath =
-        new Path(new Path(taskTmpDir),
-                 String.format("q-temp-%d.seq", System.currentTimeMillis()));
+      Path parent = new Path(taskTmpDir);
+      Path sub = new Path(parent, "qw_" + System.currentTimeMillis());
+      tempQPath = new Path(sub, "q-temp.seq");
       tempQw =
         SequenceFile.createWriter(localFs,
                                   jobConf,
@@ -280,6 +280,7 @@ public class QRFirstStep implements Clos
   @Override
   public void collect(Writable key, Vector vw) throws IOException {
     map(key, vw);
+
   }
 
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java Fri Oct  7 14:02:20 2011
@@ -108,7 +108,7 @@ public class NaiveBayesTest extends Maho
     TrainNaiveBayesJob trainNaiveBayes = new TrainNaiveBayesJob();
     trainNaiveBayes.setConf(conf);
     trainNaiveBayes.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
-        "--labels", "stolen,not_stolen", "--trainComplementary", String.valueOf(true),
+        "--labels", "stolen,not_stolen", "--trainComplementary",
         "--tempDir", tempDir.getAbsolutePath() });
 
     NaiveBayesModel naiveBayesModel = NaiveBayesModel.materialize(new Path(outputDir.getAbsolutePath()), conf);

Propchange: mahout/trunk/distribution/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct  7 14:02:20 2011
@@ -1 +1,2 @@
 target
+*.iml

Propchange: mahout/trunk/examples/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct  7 14:02:20 2011
@@ -12,3 +12,4 @@ output
 .checkstyle
 testdata
 dist
+target

Added: mahout/trunk/examples/bin/build-asf-email.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/build-asf-email.sh?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/bin/build-asf-email.sh (added)
+++ mahout/trunk/examples/bin/build-asf-email.sh Fri Oct  7 14:02:20 2011
@@ -0,0 +1,152 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+MAHOUT="../../bin/mahout"
+ASF_ARCHIVES=$1
+OUT=$2
+OVER=$3
+export MAHOUT_HEAPSIZE=2048
+
+if [ "$1" = "-ni" ]; then
+  alg=rec
+else
+  algorithm=( recommender clustering classification )
+
+  echo "Please select a number to choose the corresponding algorithm to run"
+  echo "1. ${algorithm[0]}"
+  echo "2. ${algorithm[1]}"
+  echo "3. ${algorithm[2]}"
+  read -p "Enter your choice : " choice
+
+  echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+  alg=${algorithm[$choice-1]}
+fi
+
+
+if [ "x$alg" == "xrecommender" ]; then
+  # convert the mail to seq files
+  MAIL_OUT="$OUT/prefs/seq-files"
+  if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+    echo "Converting Mail files to Sequence Files"
+    $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --from --references --input $ASF_ARCHIVES --output $MAIL_OUT --separator " ::: "
+  fi
+  PREFS="$OUT/prefs/input"
+  PREFS_TMP="$OUT/prefs/tmp"
+  PREFS_REC_INPUT="$OUT/prefs/input/recInput"
+  RECS_OUT=$"$OUT/prefs/recommendations"
+  # prep for recs
+  if [ "x$OVER" == "xover" ] || [ ! -e "$PREFS/fromIds-dictionary-0" ]; then
+    echo "Prepping Sequence files for Recommender"
+    $MAHOUT org.apache.mahout.cf.taste.example.email.MailToPrefsDriver --input $MAIL_OUT --output $PREFS --overwrite --separator " ::: "
+  fi
+  # run the recs
+  echo "Run the recommender"
+  $MAHOUT recommenditembased --input $PREFS_REC_INPUT --output $RECS_OUT --tempDir $PREFS_TMP --similarityClassname SIMILARITY_LOGLIKELIHOOD
+
+#clustering
+elif [ "x$alg" == "xclustering" ]; then
+  MAIL_OUT="$OUT/clustering/seq-files"
+  SEQ2SP="$OUT/clustering/seq2sparse"
+  algorithm=( kmeans dirichlet )
+
+  echo "Please select a number to choose the corresponding algorithm to run"
+  echo "1. ${algorithm[0]}"
+  echo "2. ${algorithm[1]}"
+  read -p "Enter your choice : " choice
+
+  echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+  nbalg=${algorithm[$choice-1]}
+  if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+    echo "Converting Mail files to Sequence Files"
+    $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --subject --body --input $ASF_ARCHIVES --output $MAIL_OUT
+  fi
+
+  #convert to sparse vectors -- use the 2 norm (Euclidean distance) and lop of some of the common terms
+
+  if [ "x$OVER" == "xover" ] || [ ! -e "$SEQ2SP/dictionary.file-0" ]; then
+    echo "Converting the files to sparse vectors"
+    $MAHOUT seq2sparse --input $MAIL_OUT --output $SEQ2SP --norm 2 --weight TFIDF --namedVector --maxDFPercent 90 --minSupport 2 --analyzerName org.apache.mahout.text.MailArchivesClusteringAnalyzer
+  fi
+  if [ "x$nbalg" == "xkmeans" ]; then
+    CLUST_OUT="$OUT/clustering/kmeans"
+    echo "Running K-Means"
+    $MAHOUT kmeans --input "$SEQ2SP/tfidf-vectors" --output $CLUST_OUT -k 50 --maxIter 20 --distanceMeasure org.apache.mahout.common.distance.CosineDistanceMeasure --clustering --method mapreduce --clusters "$CLUST_OUT/clusters"
+  elif [ "x$nbalg" == "xdirichlet"  ]; then
+    CLUST_OUT="$OUT/clustering/dirichlet"
+    echo "Running Dirichlet"
+    $MAHOUT dirichlet --input "$SEQ2SP/tfidf-vectors" --output $CLUST_OUT -k 50 --maxIter 20 --distanceMeasure org.apache.mahout.common.distance.CosineDistanceMeasure --method mapreduce
+  fi
+
+#classification
+elif [ "x$alg" == "xclassification" ]; then
+  algorithm=( standard complementary )
+
+  echo "Please select a number to choose the corresponding algorithm to run"
+  echo "1. ${algorithm[0]}"
+  echo "2. ${algorithm[1]}"
+  read -p "Enter your choice : " choice
+
+  echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+  nbalg=${algorithm[$choice-1]}
+
+  CLASS="$OUT/classification/"
+  MAIL_OUT="$CLASS/seq-files"
+  SEQ2SP="$CLASS/seq2sparse"
+  SEQ2SPLABEL="$CLASS/labeled"
+  SPLIT="$CLASS/splits"
+  TRAIN="$SPLIT/train"
+  TEST="$SPLIT/test"
+  TEST_OUT="$CLASS/test-results"
+  LABEL="$SPLIT/labels"
+  #Convert mail to be formatted as:
+  # label\ttext
+  # One per line
+  # the label is the project_name_mailing_list, as in tomcat.apache.org_dev
+  if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+    echo "Converting Mail files to Sequence Files"
+    $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --subject --body --input $ASF_ARCHIVES --output $MAIL_OUT
+  fi
+  #Convert to vectors
+  if [ "x$OVER" == "xover" ] || [ ! -e "$SEQ2SP/dictionary.file-0" ]; then
+    echo "Converting the files to sparse vectors"
+    $MAHOUT seq2sparse --input $MAIL_OUT --output $SEQ2SP --norm 2 --weight TFIDF --namedVector --maxDFPercent 90 --minSupport 2 --analyzerName org.apache.mahout.text.MailArchivesClusteringAnalyzer
+    #We need to modify the vectors to have a better label
+    echo "Converting vector labels"
+    $MAHOUT org.apache.mahout.classifier.email.PrepEmailVectorsDriver --input "$SEQ2SP/tfidf-vectors" --output $SEQ2SPLABEL --overwrite
+  fi
+  if [ "x$OVER" == "xover" ] || [ ! -e "$TRAIN/part-m-00000" ]; then
+    #setup train/test files
+    echo "Creating training and test inputs"
+    $MAHOUT split --input $SEQ2SPLABEL --trainingOutput $TRAIN --testOutput $TEST --randomSelectionPct 20 --overwrite --sequenceFiles
+  fi
+  MODEL="$CLASS/model"
+  if [ "x$nbalg" == "xstandard" ]; then
+    echo "Running Standard Training"
+    $MAHOUT trainnb -i $TRAIN -o $MODEL --extractLabels --labelIndex $LABEL --overwrite
+    echo "Running Test"
+    $MAHOUT testnb -i $TEST -o $TEST_OUT -m $MODEL --labelIndex $LABEL --overwrite
+
+  elif [ "x$nbalg" == "xcomplementary"  ]; then
+    echo "Running Complementary Training"
+    $MAHOUT trainnb -i $TRAIN -o $MODEL --extractLabels --labelIndex $LABEL --overwrite --trainComplementary
+    echo "Running Complementary Test"
+    $MAHOUT testnb -i $TEST -o $TEST_OUT -m $MODEL --labelIndex $LABEL --overwrite --testComplementary
+  fi
+
+fi
+
+

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,89 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ *
+ *
+ **/
+public final class EmailUtility {
+  public static final String SEPARATOR = "separator";
+  public static final String MSG_IDS_PREFIX = "msgIdsPrefix";
+  public static final String FROM_PREFIX = "fromPrefix";
+  public static final String MSG_ID_DIMENSION = "msgIdDim";
+
+  private EmailUtility() {
+
+  }
+
+  /**
+   * Strip off some spurious characters that make it harder to dedup
+   *
+   * @param address
+   * @return
+   */
+  public static String cleanUpEmailAddress(String address) {
+    //do some cleanup to normalize some things, like: Key: karthik ananth <ka...@gmail.com>: Value: 178
+    //Key: karthik ananth [mailto:karthik.jcecs@gmail.com]=20: Value: 179
+    //TODO: is there more to clean up here?
+    address = address.replaceAll("mailto:|<|>|\\[|\\]|\\=20", "");
+    return address;
+  }
+
+
+  public static void loadDictionaries(Configuration conf, String fromPrefix,
+                                      OpenObjectIntHashMap<String> fromDictionary,
+                                      String msgIdPrefix,
+                                      OpenObjectIntHashMap<String> msgIdDictionary) throws IOException {
+
+    URI[] localFiles = DistributedCache.getCacheFiles(conf);
+    Preconditions.checkArgument(localFiles != null,
+            "missing paths from the DistributedCache");
+    for (int i = 0; i < localFiles.length; i++) {
+      URI localFile = localFiles[i];
+      Path dictionaryFile = new Path(localFile.getPath());
+      // key is word value is id
+
+      OpenObjectIntHashMap<String> dictionary = null;
+      if (dictionaryFile.getName().startsWith(fromPrefix)) {
+        dictionary = fromDictionary;
+      } else if (dictionaryFile.getName().startsWith(msgIdPrefix)) {
+        dictionary = msgIdDictionary;
+      }
+      if (dictionary != null) {
+        for (Pair<Writable, IntWritable> record
+                : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
+          dictionary.put(record.getFirst().toString(), record.getSecond().get());
+        }
+      }
+    }
+
+  }
+
+  private static final String [] EMPTY = new String[0];
+
+  public static String[] parseReferences(String rawRefs) {
+    String[] splits = null;
+    if (rawRefs != null && rawRefs.length() > 0) {
+      splits = rawRefs.split(">|\\s+");
+      for (int i = 0; i < splits.length; i++) {
+        splits[i] = splits[i].replaceAll("<|>", "");
+      }
+    } else {
+      splits = EMPTY;
+    }
+    return splits;
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,41 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+
+/**
+ *  Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ *
+ **/
+public class FromEmailToDictionaryMapper extends
+        Mapper<Text, Text, Text, VarIntWritable> {
+  private String separator = "\n";
+
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+  }
+
+  @Override
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    //From is in the value
+    String valStr = value.toString();
+    int idx = valStr.indexOf(separator);
+    if (idx != -1){
+      String full = valStr.substring(0, idx);
+      //do some cleanup to normalize some things, like: Key: karthik ananth <ka...@gmail.com>: Value: 178
+            //Key: karthik ananth [mailto:karthik.jcecs@gmail.com]=20: Value: 179
+      //TODO: is there more to clean up here?
+      full = EmailUtility.cleanUpEmailAddress(full);
+
+      context.write(new Text(full), new VarIntWritable(1));
+    }
+
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,29 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+
+/**
+ * Key: the string id
+ * Value: the count
+ * Out Key: the string id
+ * Out Value: the sum of the counts
+ *
+ **/
+public class MailToDictionaryReducer extends
+        Reducer<Text, VarIntWritable, Text, VarIntWritable> {
+
+  @Override
+  protected void reduce(Text key, Iterable<VarIntWritable> values, Context context) throws IOException, InterruptedException {
+    int sum = 0;
+    for (VarIntWritable value : values) {
+      sum += value.get();
+    }
+    context.write(new Text(key), new VarIntWritable(sum));
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,245 @@
+package org.apache.mahout.cf.taste.example.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Convert the Mail archives (see {@link org.apache.mahout.text.SequenceFilesFromMailArchives}) to a preference
+ * file that can be consumed by the {@link org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob}.
+ * <p/>
+ * This assumes the input is a Sequence File, that the key is: filename/message id and the value is a list (separated by the
+ * user's choosing) of: from, to, subject
+ * <p/>
+ * The output is a matrix where either the from or to are the rows (represented as longs) and the columns are the message ids
+ * that the user has interacted with (as a VectorWritable).  This class currently does not account for thread hijacking.
+ * <p/>
+ * It also outputs a side table mapping the row ids to their original and the message ids to the message thread id
+ */
+public class MailToPrefsDriver extends AbstractJob {
+  private static final Logger log = LoggerFactory.getLogger(MailToPrefsDriver.class);
+
+  private static final String OUTPUT_FILES_PATTERN = "part-*";
+  private static final int DICTIONARY_BYTE_OVERHEAD = 4;
+
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new MailToPrefsDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int result = 0;
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption("chunkSize", "cs", "The size of chunks to write.  Default is 100 mb", "100");
+    addOption("separator", "sep", "The separator used in the input file to separate to, from, subject.  Default is \\n", "\n");
+    Map<String, String> parsedArgs = parseArguments(args);
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    int chunkSize = Integer.parseInt(parsedArgs.get("--chunkSize"));
+    String separator = parsedArgs.get("--separator");
+    Configuration conf = getConf();
+    if (conf == null) {
+      setConf(new Configuration());
+      conf = getConf();
+    }
+
+    AtomicInteger currentPhase = new AtomicInteger();
+    int[] msgDim = new int[1];
+    int[] fromDim = new int[1];
+    //TODO: mod this to not do so many passes over the data.  Dictionary creation could probably be a chain mapper
+    List<Path> msgIdChunks = null, fromChunks = null;
+    boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+    // create the dictionary between message ids and longs
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      //TODO: there seems to be a pattern emerging for dictionary creation -- sparse vectors from seq files also has this.
+      Path msgIdsPath = new Path(output, "msgIds");
+      if (overwrite) {
+        HadoopUtil.delete(conf, msgIdsPath);
+      }
+      log.info("Creating Msg Id Dictionary");
+      Job createMsgIdDictionary = prepareJob(input,
+              msgIdsPath,
+              SequenceFileInputFormat.class,
+              MsgIdToDictionaryMapper.class,
+              Text.class,
+              VarIntWritable.class,
+              MailToDictionaryReducer.class,
+              Text.class,
+              VarIntWritable.class,
+              SequenceFileOutputFormat.class);
+      createMsgIdDictionary.waitForCompletion(true);
+      //write out the dictionary at the top level
+      msgIdChunks = createDictionaryChunks(msgIdsPath, output, "msgIds-dictionary-", createMsgIdDictionary.getConfiguration(), chunkSize, msgDim);
+    }
+    //create the dictionary between from email addresses and longs
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Path fromIdsPath = new Path(output, "fromIds");
+      if (overwrite) {
+        HadoopUtil.delete(conf, fromIdsPath);
+      }
+      log.info("Creating From Id Dictionary");
+      Job createFromIdDictionary = prepareJob(input,
+              fromIdsPath,
+              SequenceFileInputFormat.class,
+              FromEmailToDictionaryMapper.class,
+              Text.class,
+              VarIntWritable.class,
+              MailToDictionaryReducer.class,
+              Text.class,
+              VarIntWritable.class,
+              SequenceFileOutputFormat.class);
+      createFromIdDictionary.getConfiguration().set(EmailUtility.SEPARATOR, separator);
+      createFromIdDictionary.waitForCompletion(true);
+      //write out the dictionary at the top level
+      fromChunks = createDictionaryChunks(fromIdsPath, output, "fromIds-dictionary-", createFromIdDictionary.getConfiguration(), chunkSize, fromDim);
+    }
+    //OK, we have our dictionaries, let's output the real thing we need: <from_id -> <msgId, msgId, msgId, ...>>
+    if (shouldRunNextPhase(parsedArgs, currentPhase) && fromChunks != null && msgIdChunks != null) {
+      //Job map
+      //may be a way to do this so that we can load the from ids in memory, if they are small enough so that we don't need the double loop
+      log.info("Creating recommendation matrix");
+      int i = 0, j = 0;
+      Path vecPath = new Path(output, "recInput");
+      if (overwrite) {
+        HadoopUtil.delete(conf, vecPath);
+      }
+      //conf.set(EmailUtility.FROM_DIMENSION, String.valueOf(fromDim[0]));
+      conf.set(EmailUtility.MSG_ID_DIMENSION, String.valueOf(msgDim[0]));
+      conf.set(EmailUtility.FROM_PREFIX, "fromIds-dictionary-");
+      conf.set(EmailUtility.MSG_IDS_PREFIX, "msgIds-dictionary-");
+      conf.set(EmailUtility.SEPARATOR, separator);
+      for (Path fromChunk : fromChunks) {
+        for (Path idChunk : msgIdChunks) {
+          Path out = new Path(vecPath, "tmp-" + i + "-" + j);
+          DistributedCache.setCacheFiles(new URI[]{fromChunk.toUri(), idChunk.toUri()}, conf);
+          Job createRecMatrix = prepareJob(input, out, SequenceFileInputFormat.class,
+                  MailToRecMapper.class, NullWritable.class, Text.class,
+                  TextOutputFormat.class);
+          createRecMatrix.getConfiguration().set("mapred.output.compress", "false");
+          createRecMatrix.waitForCompletion(true);
+          //copy the results up a level
+          //HadoopUtil.copyMergeSeqFiles(out.getFileSystem(conf), out, vecPath.getFileSystem(conf), outPath, true, conf, "");
+          FileStatus fs[] = HadoopUtil.getFileStatus(new Path(out, "*"), PathType.GLOB, PathFilters.partFilter(), null, conf);
+          for (int k = 0; k < fs.length; k++) {
+            FileStatus f = fs[k];
+            Path outPath = new Path(vecPath, "chunk-" + i + "-" + j + "-" + k);
+            FileUtil.copy(f.getPath().getFileSystem(conf), f.getPath(), outPath.getFileSystem(conf), outPath, true, overwrite, conf);
+          }
+          HadoopUtil.delete(conf, out);
+          j++;
+        }
+        i++;
+      }
+      //concat the files together
+      /*Path mergePath = new Path(output, "vectors.dat");
+      if (overwrite) {
+        HadoopUtil.delete(conf, mergePath);
+      }
+      log.info("Merging together output vectors to vectors.dat in {}", output);*/
+      //HadoopUtil.copyMergeSeqFiles(vecPath.getFileSystem(conf), vecPath, mergePath.getFileSystem(conf), mergePath, false, conf, "\n");
+    }
+
+    return result;
+  }
+
+  private static List<Path> createDictionaryChunks(Path inputPath,
+                                                   Path dictionaryPathBase,
+                                                   String name,
+                                                   Configuration baseConf,
+                                                   int chunkSizeInMegabytes, int[] maxTermDimension) throws IOException {
+    List<Path> chunkPaths = Lists.newArrayList();
+
+    Configuration conf = new Configuration(baseConf);
+
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
+    int chunkIndex = 0;
+    Path chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+    chunkPaths.add(chunkPath);
+
+    SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+
+    try {
+      long currentChunkSize = 0;
+      Path filesPattern = new Path(inputPath, OUTPUT_FILES_PATTERN);
+      int i = 0;
+      for (Pair<Writable, Writable> record
+              : new SequenceFileDirIterable<Writable, Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {
+        if (currentChunkSize > chunkSizeLimit) {
+          Closeables.closeQuietly(dictWriter);
+          chunkIndex++;
+
+          chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+          chunkPaths.add(chunkPath);
+
+          dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+          currentChunkSize = 0;
+        }
+
+        Writable key = record.getFirst();
+        int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
+        currentChunkSize += fieldSize;
+        dictWriter.append(key, new IntWritable(i++));
+      }
+      maxTermDimension[0] = i;
+    } finally {
+      Closeables.closeQuietly(dictWriter);
+    }
+
+    return chunkPaths;
+  }
+
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,99 @@
+package org.apache.mahout.cf.taste.example.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ *
+ **/
+public class MailToRecMapper extends
+        Mapper<Text, Text, NullWritable, Text> {
+  private transient static Logger log = LoggerFactory.getLogger(MailToRecMapper.class);
+  private OpenObjectIntHashMap<String> fromDictionary = new OpenObjectIntHashMap<String>();
+  private OpenObjectIntHashMap<String> msgIdDictionary = new OpenObjectIntHashMap<String>();
+  private String separator = "\n";
+
+  public enum Counters {
+    REFERENCE, ORIGINAL
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String fromPrefix = conf.get(EmailUtility.FROM_PREFIX);
+    String msgPrefix = conf.get(EmailUtility.MSG_IDS_PREFIX);
+    EmailUtility.loadDictionaries(conf, fromPrefix, fromDictionary, msgPrefix, msgIdDictionary);
+    log.info("From Dictionary size: {} Msg Id Dictionary size: {}", fromDictionary.size(), msgIdDictionary.size());
+    separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+  }
+
+  @Override
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+
+    String msgId = null;
+    int msgIdKey = Integer.MIN_VALUE;
+
+
+    int fromKey = Integer.MIN_VALUE;
+    String valStr = value.toString();
+    String[] splits = StringUtils.splitByWholeSeparatorPreserveAllTokens(valStr, separator);
+    //format is:  from, to, refs, subject, body
+
+    if (splits != null && splits.length > 0) {
+      String from = EmailUtility.cleanUpEmailAddress(splits[0]);
+      fromKey = fromDictionary.get(from);
+      //get the references
+      if (splits.length > 2) {
+        String[] theRefs = EmailUtility.parseReferences(splits[2]);
+        if (theRefs != null && theRefs.length > 0) {
+          //we have a reference, the first one is the original message id, so map to that one if it exists
+          msgIdKey = msgIdDictionary.get(theRefs[0]);
+          context.getCounter(Counters.REFERENCE).increment(1);
+        }
+      }
+    }
+    if (msgIdKey == Integer.MIN_VALUE) {//we don't have any references, so use the msg id
+      //get the msg id and the from and output the associated ids
+      String keyStr = key.toString();
+      int idx = keyStr.lastIndexOf("/");
+      if (idx != -1) {
+        msgId = keyStr.substring(idx + 1);
+        msgIdKey = msgIdDictionary.get(msgId);
+        context.getCounter(Counters.ORIGINAL).increment(1);
+      }
+    }
+
+    if (msgIdKey != Integer.MIN_VALUE && fromKey != Integer.MIN_VALUE) {
+      context.write(null, new Text(fromKey + "," + msgIdKey + ",1"));
+    }
+  }
+
+
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,32 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+
+import java.io.IOException;
+
+/**
+ * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ */
+public class MsgIdToDictionaryMapper extends
+        Mapper<Text, Text, Text, VarIntWritable> {
+  public enum Counters {
+    NO_MESSAGE_ID
+  }
+
+  @Override
+  protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    //message id is in the key: /201008/AANLkTikvVnhNH+Y5AGEwqd2=u0CFv2mCm0ce6E6oBnj1@mail.gmail.com
+    String keyStr = key.toString();
+    int idx = keyStr.lastIndexOf("/");
+    String msgId = null;
+    if (idx != -1) {
+      msgId = keyStr.substring(idx + 1);
+      context.write(new Text(msgId), new VarIntWritable(1));
+    } else {
+      context.getCounter(Counters.NO_MESSAGE_ID).increment(1);
+    }
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,52 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Convert the labels created by the {@link org.apache.mahout.utils.email.MailProcessor} to one consumable by the classifiers
+ */
+public class PrepEmailMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
+  private boolean useListName = false;//if true, use the project name and the list name in label creation
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    useListName = Boolean.parseBoolean(context.getConfiguration().get(PrepEmailVectorsDriver.USE_LIST_NAME));
+  }
+
+  @Override
+  protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException, InterruptedException {
+    String input = key.toString();
+    ///Example: /cocoon.apache.org/dev/200307.gz/001401c3414f$8394e160$1e01a8c0@WRPO
+    String[] splits = input.split("\\/");
+    //we need the first two splits;
+    if (splits.length >= 3) {
+      StringBuilder bldr = new StringBuilder(splits[1].replaceAll("-|\\.", "_").toLowerCase());
+      if (useListName == true) {
+        bldr.append("_").append(splits[2].replaceAll("-|\\.", "_").toLowerCase());
+      }
+      context.write(new Text(bldr.toString()), value);
+    }
+
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,48 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ *
+ *
+ **/
+public class PrepEmailReducer extends Reducer<Text, VectorWritable, Text, VectorWritable>{
+  long maxItemsPerLabel = 10000;
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    maxItemsPerLabel = Long.parseLong(context.getConfiguration().get(PrepEmailVectorsDriver.ITEMS_PER_CLASS));
+  }
+
+  @Override
+  protected void reduce(Text key, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException {
+    //TODO: support randomization?  Likely not needed due to the SplitInput utility which does random selection
+    long i = 0;
+    Iterator<VectorWritable> iterator = values.iterator();
+    while (i < maxItemsPerLabel && iterator.hasNext()){
+      context.write(key, iterator.next());
+      i++;
+    }
+  }
+}

Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,73 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.Map;
+
+/**
+ * Convert the labels generated by {@link org.apache.mahout.text.SequenceFilesFromMailArchives} and
+ * {@link org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles} to ones consumable by the classifiers.  We do this
+ * here b/c if it is done in the creation of sparse vectors, the Reducer collapses all the vectors.
+ */
+public class PrepEmailVectorsDriver extends AbstractJob {
+
+  public static final String ITEMS_PER_CLASS = "itemsPerClass";
+  public static final String USE_LIST_NAME = "USE_LIST_NAME";
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new PrepEmailVectorsDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int result = 0;
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption("maxItemsPerLabel", "mipl", "The maximum number of items per label.  Can be useful for making the training sets the same size", String.valueOf(100000));
+    addOption(buildOption("useListName", "ul", "Use the name of the list as part of the label.  If not set, then just use the project name", false, false, "false"));
+    Map<String,String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    Job convertJob = prepareJob(input, output, SequenceFileInputFormat.class, PrepEmailMapper.class,
+            Text.class, VectorWritable.class, PrepEmailReducer.class, Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
+    convertJob.getConfiguration().set(ITEMS_PER_CLASS, parsedArgs.get("--maxItemsPerLabel"));
+    convertJob.getConfiguration().set(USE_LIST_NAME, String.valueOf(parsedArgs.containsKey("--useListName")));
+    convertJob.waitForCompletion(true);
+    return result;
+  }
+}

Added: mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java (added)
+++ mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java Fri Oct  7 14:02:20 2011
@@ -0,0 +1,16 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.mahout.examples.MahoutTestCase;
+
+/**
+ *
+ *
+ **/
+public class MailToPrefsTest extends MahoutTestCase{
+
+  public void test() throws Exception {
+
+  }
+
+}