You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2011/10/07 16:02:23 UTC
svn commit: r1180043 [1/3] - in /mahout/trunk: bin/ buildtools/ core/
core/src/main/java/org/apache/mahout/classifier/naivebayes/
core/src/main/java/org/apache/mahout/classifier/naivebayes/test/
core/src/main/java/org/apache/mahout/classifier/naivebaye...
Author: gsingers
Date: Fri Oct 7 14:02:20 2011
New Revision: 1180043
URL: http://svn.apache.org/viewvc?rev=1180043&view=rev
Log:
MAHOUT-798: add in examples for working with ASF email archive, plus various refactorings to clusterdumper, etc. for viewing results
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java
mahout/trunk/examples/bin/build-asf-email.sh
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java
mahout/trunk/examples/src/test/java/org/apache/mahout/cf/
mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/
mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/
mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/
mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/AbstractClusterWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/CSVClusterWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/ClusterDumperWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/io/ClusterWriter.java
mahout/trunk/integration/src/test/java/org/apache/mahout/utils/email/
mahout/trunk/integration/src/test/java/org/apache/mahout/utils/email/MailProcessorTest.java
mahout/trunk/integration/src/test/resources/
mahout/trunk/integration/src/test/resources/test.mbox
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainUtils.java
mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/bayes/SplitBayesInput.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
Modified:
mahout/trunk/bin/mahout
mahout/trunk/buildtools/ (props changed)
mahout/trunk/core/ (props changed)
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java
mahout/trunk/distribution/ (props changed)
mahout/trunk/examples/ (props changed)
mahout/trunk/examples/src/test/java/org/apache/mahout/classifier/bayes/SplitBayesInputTest.java
mahout/trunk/integration/bin/prep_asf_mail_archives.sh
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
mahout/trunk/src/conf/driver.classes.props
Modified: mahout/trunk/bin/mahout
URL: http://svn.apache.org/viewvc/mahout/trunk/bin/mahout?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/bin/mahout (original)
+++ mahout/trunk/bin/mahout Fri Oct 7 14:02:20 2011
@@ -105,15 +105,7 @@ if [ "$MAHOUT_HEAPSIZE" != "" ]; then
fi
if [ "x$MAHOUT_CONF_DIR" = "x" ]; then
- if [ -d $MAHOUT_HOME/src/conf ]; then
- MAHOUT_CONF_DIR=$MAHOUT_HOME/src/conf
- else
- if [ -d $MAHOUT_HOME/conf ]; then
- MAHOUT_CONF_DIR=$MAHOUT_HOME/conf
- else
- echo No MAHOUT_CONF_DIR found
- fi
- fi
+ MAHOUT_CONF_DIR=$MAHOUT_HOME/src/conf
fi
# CLASSPATH initially contains $MAHOUT_CONF_DIR, or defaults to $MAHOUT_HOME/src/conf
@@ -205,13 +197,8 @@ if [ "$HADOOP_HOME" = "" ] || [ "$MAHOUT
elif [ "$MAHOUT_LOCAL" != "" ] ; then
echo "MAHOUT_LOCAL is set, running locally"
fi
- case $1 in
- (classpath)
- echo $CLASSPATH
- ;;
- (*)
- CLASSPATH=$CLASSPATH exec "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS $CLASS "$@"
- esac
+# echo "CLASSPATH: $CLASSPATH"
+ exec "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS -classpath "$CLASSPATH" $CLASS "$@"
else
echo "Running on hadoop, using HADOOP_HOME=$HADOOP_HOME"
if [ "$HADOOP_CONF_DIR" = "" ] ; then
@@ -225,18 +212,13 @@ else
echo "ERROR: Could not find mahout-examples-*.job in $MAHOUT_HOME or $MAHOUT_HOME/examples/target, please run 'mvn install' to create the .job file"
exit 1
else
- case "$1" in
- (hadoop)
+ if [ "$1" = "hadoop" ]; then
export HADOOP_CLASSPATH=$MAHOUT_CONF_DIR:${HADOOP_CLASSPATH}:$CLASSPATH
exec "$HADOOP_HOME/bin/$@"
- ;;
- (classpath)
- echo $CLASSPATH
- ;;
- (*)
+ else
echo "MAHOUT-JOB: $MAHOUT_JOB"
export HADOOP_CLASSPATH=$MAHOUT_CONF_DIR:${HADOOP_CLASSPATH}
exec "$HADOOP_HOME/bin/hadoop" --config $HADOOP_CONF_DIR jar $MAHOUT_JOB $CLASS "$@"
- esac
+ fi
fi
fi
Propchange: mahout/trunk/buildtools/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 7 14:02:20 2011
@@ -1 +1,2 @@
target
+*.iml
Propchange: mahout/trunk/core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 7 14:02:20 2011
@@ -11,3 +11,5 @@ testdatatestdata
target
.checkstyle
dist
+temp
+testdata
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/AbstractNaiveBayesClassifier.java Fri Oct 7 14:02:20 2011
@@ -24,7 +24,7 @@ import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
/** Class implementing the Naive Bayes Classifier Algorithm */
-abstract class AbstractNaiveBayesClassifier extends AbstractVectorClassifier {
+public abstract class AbstractNaiveBayesClassifier extends AbstractVectorClassifier {
private final NaiveBayesModel model;
Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.naivebayes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.training.ThetaMapper;
+import org.apache.mahout.classifier.naivebayes.training.TrainNaiveBayesJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.SparseMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class BayesUtils {
+
+ private BayesUtils() {}
+
+
+ public static NaiveBayesModel readModelFromDir(Path base, Configuration conf) {
+
+ float alphaI = conf.getFloat(ThetaMapper.ALPHA_I, 1.0f);
+
+ // read feature sums and label sums
+ Vector scoresPerLabel = null;
+ Vector scoresPerFeature = null;
+ for (Pair<Text,VectorWritable> record : new SequenceFileDirIterable<Text, VectorWritable>(
+ new Path(base, TrainNaiveBayesJob.WEIGHTS), PathType.LIST, PathFilters.partFilter(), conf)) {
+ String key = record.getFirst().toString();
+ VectorWritable value = record.getSecond();
+ if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE)) {
+ scoresPerFeature = value.get();
+ } else if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_LABEL)) {
+ scoresPerLabel = value.get();
+ }
+ }
+
+ Preconditions.checkNotNull(scoresPerFeature);
+ Preconditions.checkNotNull(scoresPerLabel);
+
+ Matrix scoresPerLabelAndFeature = new SparseMatrix(scoresPerLabel.size(), scoresPerFeature.size());
+ for (Pair<IntWritable,VectorWritable> entry : new SequenceFileDirIterable<IntWritable,VectorWritable>(
+ new Path(base, TrainNaiveBayesJob.SUMMED_OBSERVATIONS), PathType.LIST, PathFilters.partFilter(), conf)) {
+ scoresPerLabelAndFeature.assignRow(entry.getFirst().get(), entry.getSecond().get());
+ }
+
+ Vector perlabelThetaNormalizer = null;
+ for (Pair<Text,VectorWritable> entry : new SequenceFileDirIterable<Text,VectorWritable>(
+ new Path(base, TrainNaiveBayesJob.THETAS), PathType.LIST, PathFilters.partFilter(), conf)) {
+ if (entry.getFirst().toString().equals(TrainNaiveBayesJob.LABEL_THETA_NORMALIZER)) {
+ perlabelThetaNormalizer = entry.getSecond().get();
+ }
+ }
+
+ Preconditions.checkNotNull(perlabelThetaNormalizer);
+
+ return new NaiveBayesModel(scoresPerLabelAndFeature, scoresPerFeature, scoresPerLabel, perlabelThetaNormalizer,
+ alphaI);
+ }
+
+ /** Write the list of labels into a map file */
+ public static int writeLabelIndex(Configuration conf, Iterable<String> labels, Path indexPath)
+ throws IOException {
+ FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);
+ int i = 0;
+ try {
+ for (String label : labels) {
+ writer.append(new Text(label), new IntWritable(i++));
+ }
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+ return i;
+ }
+
+ public static int writeLabelIndex(Configuration conf, Path indexPath, SequenceFileDirIterable labels) throws IOException {
+ FileSystem fs = FileSystem.get(indexPath.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);
+ Set<String> seen = new HashSet<String>();
+ int i = 0;
+ try {
+ for (Object label : labels) {
+ String theLabel = ((Pair) label).getFirst().toString();
+ if (seen.contains(theLabel) == false){
+ writer.append(new Text(theLabel), new IntWritable(i++));
+ seen.add(theLabel);
+ }
+ }
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+ return i;
+ }
+
+ public static Map<Integer, String> readLabelIndex(Configuration conf, Path indexPath) throws IOException {
+ Map<Integer, String> labelMap = new HashMap<Integer, String>();
+ SequenceFileIterable<Text, IntWritable> fileIterable = new SequenceFileIterable<Text, IntWritable>(indexPath, true, conf);
+ for (Pair<Text, IntWritable> pair : fileIterable) {
+ labelMap.put(pair.getSecond().get(), pair.getFirst().toString());
+ }
+ return labelMap;
+ }
+
+ public static OpenObjectIntHashMap<String> readIndexFromCache(Configuration conf) throws IOException {
+ OpenObjectIntHashMap<String> index = new OpenObjectIntHashMap<String>();
+ for (Pair<Writable,IntWritable> entry : new SequenceFileIterable<Writable,IntWritable>(HadoopUtil.cachedFile(conf), conf)) {
+ index.put(entry.getFirst().toString(), entry.getSecond().get());
+ }
+ return index;
+ }
+
+ public static Map<String,Vector> readScoresFromCache(Configuration conf) throws IOException {
+ Map<String,Vector> sumVectors = Maps.newHashMap();
+ for (Pair<Text,VectorWritable> entry : new SequenceFileDirIterable<Text,VectorWritable>(HadoopUtil.cachedFile(conf),
+ PathType.LIST, PathFilters.partFilter(), conf)) {
+ sumVectors.put(entry.getFirst().toString(), entry.getSecond().get());
+ }
+ return sumVectors;
+ }
+
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/BayesTestMapper.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,62 @@
+package org.apache.mahout.classifier.naivebayes.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.AbstractNaiveBayesClassifier;
+import org.apache.mahout.classifier.naivebayes.ComplementaryNaiveBayesClassifier;
+import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.StandardNaiveBayesClassifier;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Run the input through the model and see if it matches.
+ * <p/>
+ * The output value is the generated label, the Pair is the expected label and true if they match:
+ */
+public class BayesTestMapper extends Mapper<Text, VectorWritable, Text, VectorWritable> {
+ AbstractNaiveBayesClassifier classifier;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Path modelPath = HadoopUtil.cachedFile(conf);
+ NaiveBayesModel model = NaiveBayesModel.materialize(modelPath, conf);
+ boolean compl = Boolean.parseBoolean(conf.get(TestNaiveBayesDriver.COMPLEMENTARY));
+ if (compl) {
+ classifier = new ComplementaryNaiveBayesClassifier(model);
+ } else {
+ classifier = new StandardNaiveBayesClassifier(model);
+ }
+ }
+
+ @Override
+ protected void map(Text key, VectorWritable value, Context context) throws IOException, InterruptedException {
+ Vector result = classifier.classify(value.get());
+ //the key is the expected value
+ context.write(key, new VectorWritable(result));
+
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/test/TestNaiveBayesDriver.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,130 @@
+package org.apache.mahout.classifier.naivebayes.test;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.classifier.ClassifierResult;
+import org.apache.mahout.classifier.ResultAnalyzer;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Test the (Complementary) Naive Bayes model that was built during training
+ * by running the iterating the test set and comparing it to the model
+ */
+public class TestNaiveBayesDriver extends AbstractJob {
+ private transient static Logger log = LoggerFactory.getLogger(TestNaiveBayesDriver.class);
+ public static final String LABEL_KEY = "labels";
+ public static final String COMPLEMENTARY = "class"; //b for bayes, c for complementary
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new TestNaiveBayesDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int result = 0;
+ addInputOption();
+ addOutputOption();
+ addOption(addOption(DefaultOptionCreator.overwriteOption().create()));
+ addOption("model", "m", "The path to the model built during training", true);
+ addOption(buildOption("testComplementary", "c", "test complementary?", false, false, String.valueOf(false)));
+ addOption("labelIndex", "l", "The path to the location of the label index", true);
+ Map<String, String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), getOutputPath());
+ }
+ Path model = new Path(parsedArgs.get("--model"));
+ HadoopUtil.cacheFiles(model, getConf());
+ //the output key is the expected value, the output value are the scores for all the labels
+ Job testJob = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class, BayesTestMapper.class,
+ Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
+ //testJob.getConfiguration().set(LABEL_KEY, parsedArgs.get("--labels"));
+ boolean complementary = parsedArgs.containsKey("--testComplementary");
+ testJob.getConfiguration().set(COMPLEMENTARY, String.valueOf(complementary));
+ testJob.waitForCompletion(true);
+ //load the labels
+ Map<Integer, String> labelMap = BayesUtils.readLabelIndex(getConf(), new Path(parsedArgs.get("--labelIndex")));
+
+ //loop over the results and create the confusion matrix
+ SequenceFileDirIterable<Text, VectorWritable> dirIterable = new SequenceFileDirIterable<Text, VectorWritable>(getOutputPath(), PathType.LIST, PathFilters.partFilter(), getConf());
+ ResultAnalyzer analyzer = new ResultAnalyzer(labelMap.values(), "DEFAULT");
+ analyzeResults(labelMap, dirIterable, analyzer, complementary);
+
+ log.info((complementary ? "Complementary" : "Standard NB") + " Results: {}", analyzer);
+ return result;
+ }
+
+ private void analyzeResults(Map<Integer, String> labelMap, SequenceFileDirIterable<Text, VectorWritable> dirIterable, ResultAnalyzer analyzer, boolean complementary) {
+ double bestScore;
+ int bestIdx;
+ if (complementary){
+ for (Pair<Text, VectorWritable> pair : dirIterable) {
+ bestIdx = Integer.MIN_VALUE;
+ bestScore = Long.MIN_VALUE;
+ for (Vector.Element element : pair.getSecond().get()) {
+ if (element.get() > bestScore) {
+ bestScore = element.get();
+ bestIdx = element.index();
+ }
+ }
+ if (bestIdx != Integer.MAX_VALUE) {
+ ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore);
+ analyzer.addInstance(pair.getFirst().toString(), classifierResult);
+ }
+ }
+ } else {
+ for (Pair<Text, VectorWritable> pair : dirIterable) {
+ bestIdx = Integer.MIN_VALUE;
+ bestScore = Long.MIN_VALUE;
+ for (Vector.Element element : pair.getSecond().get()) {
+ if (element.get() > bestScore) {
+ bestScore = element.get();
+ bestIdx = element.index();
+ }
+ }
+ if (bestIdx != Integer.MIN_VALUE) {
+ ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore);
+ analyzer.addInstance(pair.getFirst().toString(), classifierResult);
+ }
+ }
+ }
+
+ }
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java Fri Oct 7 14:02:20 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
@@ -33,7 +34,7 @@ public class IndexInstancesMapper extend
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
- labelIndex = TrainUtils.readIndexFromCache(ctx.getConfiguration());
+ labelIndex = BayesUtils.readIndexFromCache(ctx.getConfiguration());
}
@Override
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/ThetaMapper.java Fri Oct 7 14:02:20 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
@@ -40,7 +41,7 @@ public class ThetaMapper extends Mapper<
float alphaI = conf.getFloat(ALPHA_I, 1.0f);
boolean trainComplemenary = conf.getBoolean(TRAIN_COMPLEMENTARY, false);
- Map<String,Vector> scores = TrainUtils.readScoresFromCache(conf);
+ Map<String,Vector> scores = BayesUtils.readScoresFromCache(conf);
if (!trainComplemenary) {
trainer = new StandardThetaTrainer(scores.get(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE),
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/classifier/naivebayes/training/TrainNaiveBayesJob.java Fri Oct 7 14:02:20 2011
@@ -17,17 +17,26 @@
package org.apache.mahout.classifier.naivebayes.training;
+import java.io.IOException;
import java.util.Map;
import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.classifier.naivebayes.NaiveBayesModel;
+import org.apache.mahout.classifier.naivebayes.BayesUtils;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.common.mapreduce.VectorSumReducer;
import org.apache.mahout.math.VectorWritable;
@@ -42,27 +51,44 @@ public final class TrainNaiveBayesJob ex
public static final String WEIGHTS = "weights";
public static final String THETAS = "thetas";
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new TrainNaiveBayesJob(), args);
+ }
+
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
- addOption("labels", "l", "comma-separated list of labels to include in training", true);
- addOption("alphaI", "a", "smoothing parameter", String.valueOf(1.0f));
- addOption("trainComplementary", "c", "train complementary?", String.valueOf(false));
+ addOption("labels", "l", "comma-separated list of labels to include in training", false);
+ addOption(buildOption("extractLabels", "el", "Extract the labels from the input", false, false, ""));
+ addOption("alphaI", "a", "smoothing parameter", String.valueOf(1.0f));
+ addOption(buildOption("trainComplementary", "c", "train complementary?", false, false, String.valueOf(false)));
+ addOption("labelIndex", "li", "The path to store the label index in", false);
+ addOption(DefaultOptionCreator.overwriteOption().create());
Map<String,String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
-
- Iterable<String> labels = Splitter.on(",").split(parsedArgs.get("--labels"));
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), getOutputPath());
+ HadoopUtil.delete(getConf(), getTempPath());
+ }
+ Path labPath;
+ String labPathStr = parsedArgs.get("--labelIndex");
+ if (labPathStr != null){
+ labPath = new Path(labPathStr);
+ } else {
+ labPath = getTempPath("labelIndex");
+ }
+ long labelSize = createLabelIndex(parsedArgs, labPath);
float alphaI = Float.parseFloat(parsedArgs.get("--alphaI"));
boolean trainComplementary = Boolean.parseBoolean(parsedArgs.get("--trainComplementary"));
- TrainUtils.writeLabelIndex(getConf(), labels, getTempPath("labelIndex"));
- TrainUtils.setSerializations(getConf());
- TrainUtils.cacheFiles(getTempPath("labelIndex"), getConf());
+
+ HadoopUtil.setSerializations(getConf());
+ HadoopUtil.cacheFiles(labPath, getConf());
Job indexInstances = prepareJob(getInputPath(), getTempPath(SUMMED_OBSERVATIONS), SequenceFileInputFormat.class,
IndexInstancesMapper.class, IntWritable.class, VectorWritable.class, VectorSumReducer.class, IntWritable.class,
@@ -73,11 +99,11 @@ public final class TrainNaiveBayesJob ex
Job weightSummer = prepareJob(getTempPath(SUMMED_OBSERVATIONS), getTempPath(WEIGHTS),
SequenceFileInputFormat.class, WeightsMapper.class, Text.class, VectorWritable.class, VectorSumReducer.class,
Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
- weightSummer.getConfiguration().set(WeightsMapper.NUM_LABELS, String.valueOf(Iterables.size(labels)));
+ weightSummer.getConfiguration().set(WeightsMapper.NUM_LABELS, String.valueOf(labelSize));
weightSummer.setCombinerClass(VectorSumReducer.class);
weightSummer.waitForCompletion(true);
- TrainUtils.cacheFiles(getTempPath(WEIGHTS), getConf());
+ HadoopUtil.cacheFiles(getTempPath(WEIGHTS), getConf());
Job thetaSummer = prepareJob(getTempPath(SUMMED_OBSERVATIONS), getTempPath(THETAS),
SequenceFileInputFormat.class, ThetaMapper.class, Text.class, VectorWritable.class, VectorSumReducer.class,
@@ -87,11 +113,24 @@ public final class TrainNaiveBayesJob ex
thetaSummer.getConfiguration().setBoolean(ThetaMapper.TRAIN_COMPLEMENTARY, trainComplementary);
thetaSummer.waitForCompletion(true);
- NaiveBayesModel naiveBayesModel = TrainUtils.readModelFromTempDir(getTempPath(), getConf());
+ NaiveBayesModel naiveBayesModel = BayesUtils.readModelFromDir(getTempPath(), getConf());
naiveBayesModel.validate();
naiveBayesModel.serialize(getOutputPath(), getConf());
return 0;
}
+ private long createLabelIndex(Map<String, String> parsedArgs, Path labPath) throws IOException {
+ long labelSize = 0;
+ if (parsedArgs.containsKey("--labels")){
+ Iterable<String> labels;
+ labels = Splitter.on(",").split(parsedArgs.get("--labels"));
+ labelSize = BayesUtils.writeLabelIndex(getConf(), labels, labPath);
+ } else if (parsedArgs.containsKey("--extractLabels")){
+ SequenceFileDirIterable iterable = new SequenceFileDirIterable(getInputPath(), PathType.LIST, PathFilters.logsCRCFilter(), getConf());
+ labelSize = BayesUtils.writeLabelIndex(getConf(), labPath, iterable);
+ }
+ return labelSize;
+ }
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Oct 7 14:02:20 2011
@@ -422,7 +422,8 @@ public abstract class AbstractJob extend
job.setMapperClass(mapper);
job.setMapOutputKeyClass(mapperKey);
job.setMapOutputValueClass(mapperValue);
-
+ job.setOutputKeyClass(mapperKey);
+ job.setOutputValueClass(mapperValue);
jobConf.setBoolean("mapred.compress.map.output", true);
job.setNumReduceTasks(0);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Fri Oct 7 14:02:20 2011
@@ -19,10 +19,14 @@ package org.apache.mahout.common;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -34,9 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class HadoopUtil {
-
+
private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);
-
+
private HadoopUtil() { }
public static void delete(Configuration conf, Iterable<Path> paths) throws IOException {
@@ -51,7 +55,7 @@ public final class HadoopUtil {
}
}
}
-
+
public static void delete(Configuration conf, Path... paths) throws IOException {
delete(conf, Arrays.asList(paths));
}
@@ -90,4 +94,30 @@ public final class HadoopUtil {
return fs.open(path.makeQualified(fs));
}
+ public static FileStatus[] getFileStatus(Path path, PathType pathType, PathFilter filter, Comparator<FileStatus> ordering, Configuration conf) throws IOException {
+ FileStatus[] statuses;
+ FileSystem fs = path.getFileSystem(conf);
+ if (filter == null) {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+ } else {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+ }
+ if (ordering != null) {
+ Arrays.sort(statuses, ordering);
+ }
+ return statuses;
+ }
+
+ public static void cacheFiles(Path fileToCache, Configuration conf) {
+ DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);
+ }
+
+ public static Path cachedFile(Configuration conf) throws IOException {
+ return new Path(DistributedCache.getCacheFiles(conf)[0].getPath());
+ }
+
+ public static void setSerializations(Configuration conf) {
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java Fri Oct 7 14:02:20 2011
@@ -19,7 +19,6 @@ package org.apache.mahout.common.iterato
import java.io.Closeable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
@@ -31,10 +30,10 @@ import com.google.common.collect.Iterato
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.IOUtils;
import org.apache.mahout.common.Pair;
@@ -57,16 +56,7 @@ public final class SequenceFileDirIterat
final Configuration conf) throws IOException {
- FileStatus[] statuses;
- FileSystem fs = path.getFileSystem(conf);
- if (filter == null) {
- statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
- } else {
- statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
- }
- if (ordering != null) {
- Arrays.sort(statuses, ordering);
- }
+ FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
iterators = Lists.newArrayList();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/driver/MahoutDriver.java Fri Oct 7 14:02:20 2011
@@ -188,7 +188,7 @@ public final class MahoutDriver {
programDriver.driver(argsList.toArray(new String[argsList.size()]));
if (log.isInfoEnabled()) {
- log.info("Program took {} ms", System.currentTimeMillis() - start);
+ log.info("Program took {} ms (Minutes: {})", System.currentTimeMillis() - start, (System.currentTimeMillis() - start)/60000);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java Fri Oct 7 14:02:20 2011
@@ -54,8 +54,7 @@ import com.google.common.io.Closeables;
*
*/
@SuppressWarnings("deprecation")
-public class QRFirstStep implements Closeable,
- OutputCollector<Writable, Vector> {
+public class QRFirstStep implements Closeable, OutputCollector<Writable, Vector> {
public static final String PROP_K = "ssvd.k";
public static final String PROP_P = "ssvd.p";
@@ -259,10 +258,11 @@ public class QRFirstStep implements Clos
// good enough,
// then at least it is always sequential.
String taskTmpDir = System.getProperty("java.io.tmpdir");
+
FileSystem localFs = FileSystem.getLocal(jobConf);
- tempQPath =
- new Path(new Path(taskTmpDir),
- String.format("q-temp-%d.seq", System.currentTimeMillis()));
+ Path parent = new Path(taskTmpDir);
+ Path sub = new Path(parent, "qw_" + System.currentTimeMillis());
+ tempQPath = new Path(sub, "q-temp.seq");
tempQw =
SequenceFile.createWriter(localFs,
jobConf,
@@ -280,6 +280,7 @@ public class QRFirstStep implements Clos
@Override
public void collect(Writable key, Vector vw) throws IOException {
map(key, vw);
+
}
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java?rev=1180043&r1=1180042&r2=1180043&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/classifier/naivebayes/NaiveBayesTest.java Fri Oct 7 14:02:20 2011
@@ -108,7 +108,7 @@ public class NaiveBayesTest extends Maho
TrainNaiveBayesJob trainNaiveBayes = new TrainNaiveBayesJob();
trainNaiveBayes.setConf(conf);
trainNaiveBayes.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
- "--labels", "stolen,not_stolen", "--trainComplementary", String.valueOf(true),
+ "--labels", "stolen,not_stolen", "--trainComplementary",
"--tempDir", tempDir.getAbsolutePath() });
NaiveBayesModel naiveBayesModel = NaiveBayesModel.materialize(new Path(outputDir.getAbsolutePath()), conf);
Propchange: mahout/trunk/distribution/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 7 14:02:20 2011
@@ -1 +1,2 @@
target
+*.iml
Propchange: mahout/trunk/examples/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 7 14:02:20 2011
@@ -12,3 +12,4 @@ output
.checkstyle
testdata
dist
+target
Added: mahout/trunk/examples/bin/build-asf-email.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/build-asf-email.sh?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/bin/build-asf-email.sh (added)
+++ mahout/trunk/examples/bin/build-asf-email.sh Fri Oct 7 14:02:20 2011
@@ -0,0 +1,152 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+MAHOUT="../../bin/mahout"
+ASF_ARCHIVES=$1
+OUT=$2
+OVER=$3
+export MAHOUT_HEAPSIZE=2048
+
+if [ "$1" = "-ni" ]; then
+ alg=rec
+else
+ algorithm=( recommender clustering classification )
+
+ echo "Please select a number to choose the corresponding algorithm to run"
+ echo "1. ${algorithm[0]}"
+ echo "2. ${algorithm[1]}"
+ echo "3. ${algorithm[2]}"
+ read -p "Enter your choice : " choice
+
+ echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+ alg=${algorithm[$choice-1]}
+fi
+
+
+if [ "x$alg" == "xrecommender" ]; then
+ # convert the mail to seq files
+ MAIL_OUT="$OUT/prefs/seq-files"
+ if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+ echo "Converting Mail files to Sequence Files"
+ $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --from --references --input $ASF_ARCHIVES --output $MAIL_OUT --separator " ::: "
+ fi
+ PREFS="$OUT/prefs/input"
+ PREFS_TMP="$OUT/prefs/tmp"
+ PREFS_REC_INPUT="$OUT/prefs/input/recInput"
+ RECS_OUT=$"$OUT/prefs/recommendations"
+ # prep for recs
+ if [ "x$OVER" == "xover" ] || [ ! -e "$PREFS/fromIds-dictionary-0" ]; then
+ echo "Prepping Sequence files for Recommender"
+ $MAHOUT org.apache.mahout.cf.taste.example.email.MailToPrefsDriver --input $MAIL_OUT --output $PREFS --overwrite --separator " ::: "
+ fi
+ # run the recs
+ echo "Run the recommender"
+ $MAHOUT recommenditembased --input $PREFS_REC_INPUT --output $RECS_OUT --tempDir $PREFS_TMP --similarityClassname SIMILARITY_LOGLIKELIHOOD
+
+#clustering
+elif [ "x$alg" == "xclustering" ]; then
+ MAIL_OUT="$OUT/clustering/seq-files"
+ SEQ2SP="$OUT/clustering/seq2sparse"
+ algorithm=( kmeans dirichlet )
+
+ echo "Please select a number to choose the corresponding algorithm to run"
+ echo "1. ${algorithm[0]}"
+ echo "2. ${algorithm[1]}"
+ read -p "Enter your choice : " choice
+
+ echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+ nbalg=${algorithm[$choice-1]}
+ if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+ echo "Converting Mail files to Sequence Files"
+ $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --subject --body --input $ASF_ARCHIVES --output $MAIL_OUT
+ fi
+
+ #convert to sparse vectors -- use the 2 norm (Euclidean distance) and lop of some of the common terms
+
+ if [ "x$OVER" == "xover" ] || [ ! -e "$SEQ2SP/dictionary.file-0" ]; then
+ echo "Converting the files to sparse vectors"
+ $MAHOUT seq2sparse --input $MAIL_OUT --output $SEQ2SP --norm 2 --weight TFIDF --namedVector --maxDFPercent 90 --minSupport 2 --analyzerName org.apache.mahout.text.MailArchivesClusteringAnalyzer
+ fi
+ if [ "x$nbalg" == "xkmeans" ]; then
+ CLUST_OUT="$OUT/clustering/kmeans"
+ echo "Running K-Means"
+ $MAHOUT kmeans --input "$SEQ2SP/tfidf-vectors" --output $CLUST_OUT -k 50 --maxIter 20 --distanceMeasure org.apache.mahout.common.distance.CosineDistanceMeasure --clustering --method mapreduce --clusters "$CLUST_OUT/clusters"
+ elif [ "x$nbalg" == "xdirichlet" ]; then
+ CLUST_OUT="$OUT/clustering/dirichlet"
+ echo "Running Dirichlet"
+ $MAHOUT dirichlet --input "$SEQ2SP/tfidf-vectors" --output $CLUST_OUT -k 50 --maxIter 20 --distanceMeasure org.apache.mahout.common.distance.CosineDistanceMeasure --method mapreduce
+ fi
+
+#classification
+elif [ "x$alg" == "xclassification" ]; then
+ algorithm=( standard complementary )
+
+ echo "Please select a number to choose the corresponding algorithm to run"
+ echo "1. ${algorithm[0]}"
+ echo "2. ${algorithm[1]}"
+ read -p "Enter your choice : " choice
+
+ echo "ok. You chose $choice and we'll use ${algorithm[$choice-1]}"
+ nbalg=${algorithm[$choice-1]}
+
+ CLASS="$OUT/classification/"
+ MAIL_OUT="$CLASS/seq-files"
+ SEQ2SP="$CLASS/seq2sparse"
+ SEQ2SPLABEL="$CLASS/labeled"
+ SPLIT="$CLASS/splits"
+ TRAIN="$SPLIT/train"
+ TEST="$SPLIT/test"
+ TEST_OUT="$CLASS/test-results"
+ LABEL="$SPLIT/labels"
+ #Convert mail to be formatted as:
+ # label\ttext
+ # One per line
+ # the label is the project_name_mailing_list, as in tomcat.apache.org_dev
+ if [ "x$OVER" == "xover" ] || [ ! -e "$MAIL_OUT/chunk-0" ]; then
+ echo "Converting Mail files to Sequence Files"
+ $MAHOUT org.apache.mahout.text.SequenceFilesFromMailArchives --charset "UTF-8" --subject --body --input $ASF_ARCHIVES --output $MAIL_OUT
+ fi
+ #Convert to vectors
+ if [ "x$OVER" == "xover" ] || [ ! -e "$SEQ2SP/dictionary.file-0" ]; then
+ echo "Converting the files to sparse vectors"
+ $MAHOUT seq2sparse --input $MAIL_OUT --output $SEQ2SP --norm 2 --weight TFIDF --namedVector --maxDFPercent 90 --minSupport 2 --analyzerName org.apache.mahout.text.MailArchivesClusteringAnalyzer
+ #We need to modify the vectors to have a better label
+ echo "Converting vector labels"
+ $MAHOUT org.apache.mahout.classifier.email.PrepEmailVectorsDriver --input "$SEQ2SP/tfidf-vectors" --output $SEQ2SPLABEL --overwrite
+ fi
+ if [ "x$OVER" == "xover" ] || [ ! -e "$TRAIN/part-m-00000" ]; then
+ #setup train/test files
+ echo "Creating training and test inputs"
+ $MAHOUT split --input $SEQ2SPLABEL --trainingOutput $TRAIN --testOutput $TEST --randomSelectionPct 20 --overwrite --sequenceFiles
+ fi
+ MODEL="$CLASS/model"
+ if [ "x$nbalg" == "xstandard" ]; then
+ echo "Running Standard Training"
+ $MAHOUT trainnb -i $TRAIN -o $MODEL --extractLabels --labelIndex $LABEL --overwrite
+ echo "Running Test"
+ $MAHOUT testnb -i $TEST -o $TEST_OUT -m $MODEL --labelIndex $LABEL --overwrite
+
+ elif [ "x$nbalg" == "xcomplementary" ]; then
+ echo "Running Complementary Training"
+ $MAHOUT trainnb -i $TRAIN -o $MODEL --extractLabels --labelIndex $LABEL --overwrite --trainComplementary
+ echo "Running Complementary Test"
+ $MAHOUT testnb -i $TEST -o $TEST_OUT -m $MODEL --labelIndex $LABEL --overwrite --testComplementary
+ fi
+
+fi
+
+
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/EmailUtility.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,89 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ *
+ *
+ **/
+public final class EmailUtility {
+ public static final String SEPARATOR = "separator";
+ public static final String MSG_IDS_PREFIX = "msgIdsPrefix";
+ public static final String FROM_PREFIX = "fromPrefix";
+ public static final String MSG_ID_DIMENSION = "msgIdDim";
+
+ private EmailUtility() {
+
+ }
+
+ /**
+ * Strip off some spurious characters that make it harder to dedup
+ *
+ * @param address
+ * @return
+ */
+ public static String cleanUpEmailAddress(String address) {
+ //do some cleanup to normalize some things, like: Key: karthik ananth <ka...@gmail.com>: Value: 178
+ //Key: karthik ananth [mailto:karthik.jcecs@gmail.com]=20: Value: 179
+ //TODO: is there more to clean up here?
+ address = address.replaceAll("mailto:|<|>|\\[|\\]|\\=20", "");
+ return address;
+ }
+
+
+ public static void loadDictionaries(Configuration conf, String fromPrefix,
+ OpenObjectIntHashMap<String> fromDictionary,
+ String msgIdPrefix,
+ OpenObjectIntHashMap<String> msgIdDictionary) throws IOException {
+
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ Preconditions.checkArgument(localFiles != null,
+ "missing paths from the DistributedCache");
+ for (int i = 0; i < localFiles.length; i++) {
+ URI localFile = localFiles[i];
+ Path dictionaryFile = new Path(localFile.getPath());
+ // key is word value is id
+
+ OpenObjectIntHashMap<String> dictionary = null;
+ if (dictionaryFile.getName().startsWith(fromPrefix)) {
+ dictionary = fromDictionary;
+ } else if (dictionaryFile.getName().startsWith(msgIdPrefix)) {
+ dictionary = msgIdDictionary;
+ }
+ if (dictionary != null) {
+ for (Pair<Writable, IntWritable> record
+ : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
+ dictionary.put(record.getFirst().toString(), record.getSecond().get());
+ }
+ }
+ }
+
+ }
+
+ private static final String [] EMPTY = new String[0];
+
+ public static String[] parseReferences(String rawRefs) {
+ String[] splits = null;
+ if (rawRefs != null && rawRefs.length() > 0) {
+ splits = rawRefs.split(">|\\s+");
+ for (int i = 0; i < splits.length; i++) {
+ splits[i] = splits[i].replaceAll("<|>", "");
+ }
+ } else {
+ splits = EMPTY;
+ }
+ return splits;
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/FromEmailToDictionaryMapper.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,41 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+
+/**
+ * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ *
+ **/
+public class FromEmailToDictionaryMapper extends
+ Mapper<Text, Text, Text, VarIntWritable> {
+ private String separator = "\n";
+
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+ }
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ //From is in the value
+ String valStr = value.toString();
+ int idx = valStr.indexOf(separator);
+ if (idx != -1){
+ String full = valStr.substring(0, idx);
+ //do some cleanup to normalize some things, like: Key: karthik ananth <ka...@gmail.com>: Value: 178
+ //Key: karthik ananth [mailto:karthik.jcecs@gmail.com]=20: Value: 179
+ //TODO: is there more to clean up here?
+ full = EmailUtility.cleanUpEmailAddress(full);
+
+ context.write(new Text(full), new VarIntWritable(1));
+ }
+
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToDictionaryReducer.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,29 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+
+/**
+ * Key: the string id
+ * Value: the count
+ * Out Key: the string id
+ * Out Value: the sum of the counts
+ *
+ **/
+public class MailToDictionaryReducer extends
+ Reducer<Text, VarIntWritable, Text, VarIntWritable> {
+
+ @Override
+ protected void reduce(Text key, Iterable<VarIntWritable> values, Context context) throws IOException, InterruptedException {
+ int sum = 0;
+ for (VarIntWritable value : values) {
+ sum += value.get();
+ }
+ context.write(new Text(key), new VarIntWritable(sum));
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,245 @@
+package org.apache.mahout.cf.taste.example.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Convert the Mail archives (see {@link org.apache.mahout.text.SequenceFilesFromMailArchives}) to a preference
+ * file that can be consumed by the {@link org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob}.
+ * <p/>
+ * This assumes the input is a Sequence File, that the key is: filename/message id and the value is a list (separated by the
+ * user's choosing) of: from, to, subject
+ * <p/>
+ * The output is a matrix where either the from or to are the rows (represented as longs) and the columns are the message ids
+ * that the user has interacted with (as a VectorWritable). This class currently does not account for thread hijacking.
+ * <p/>
+ * It also outputs a side table mapping the row ids to their original and the message ids to the message thread id
+ */
+public class MailToPrefsDriver extends AbstractJob {
+ private static final Logger log = LoggerFactory.getLogger(MailToPrefsDriver.class);
+
+ private static final String OUTPUT_FILES_PATTERN = "part-*";
+ private static final int DICTIONARY_BYTE_OVERHEAD = 4;
+
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new MailToPrefsDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int result = 0;
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption("chunkSize", "cs", "The size of chunks to write. Default is 100 mb", "100");
+ addOption("separator", "sep", "The separator used in the input file to separate to, from, subject. Default is \\n", "\n");
+ Map<String, String> parsedArgs = parseArguments(args);
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ int chunkSize = Integer.parseInt(parsedArgs.get("--chunkSize"));
+ String separator = parsedArgs.get("--separator");
+ Configuration conf = getConf();
+ if (conf == null) {
+ setConf(new Configuration());
+ conf = getConf();
+ }
+
+ AtomicInteger currentPhase = new AtomicInteger();
+ int[] msgDim = new int[1];
+ int[] fromDim = new int[1];
+ //TODO: mod this to not do so many passes over the data. Dictionary creation could probably be a chain mapper
+ List<Path> msgIdChunks = null, fromChunks = null;
+ boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+ // create the dictionary between message ids and longs
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ //TODO: there seems to be a pattern emerging for dictionary creation -- sparse vectors from seq files also has this.
+ Path msgIdsPath = new Path(output, "msgIds");
+ if (overwrite) {
+ HadoopUtil.delete(conf, msgIdsPath);
+ }
+ log.info("Creating Msg Id Dictionary");
+ Job createMsgIdDictionary = prepareJob(input,
+ msgIdsPath,
+ SequenceFileInputFormat.class,
+ MsgIdToDictionaryMapper.class,
+ Text.class,
+ VarIntWritable.class,
+ MailToDictionaryReducer.class,
+ Text.class,
+ VarIntWritable.class,
+ SequenceFileOutputFormat.class);
+ createMsgIdDictionary.waitForCompletion(true);
+ //write out the dictionary at the top level
+ msgIdChunks = createDictionaryChunks(msgIdsPath, output, "msgIds-dictionary-", createMsgIdDictionary.getConfiguration(), chunkSize, msgDim);
+ }
+ //create the dictionary between from email addresses and longs
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Path fromIdsPath = new Path(output, "fromIds");
+ if (overwrite) {
+ HadoopUtil.delete(conf, fromIdsPath);
+ }
+ log.info("Creating From Id Dictionary");
+ Job createFromIdDictionary = prepareJob(input,
+ fromIdsPath,
+ SequenceFileInputFormat.class,
+ FromEmailToDictionaryMapper.class,
+ Text.class,
+ VarIntWritable.class,
+ MailToDictionaryReducer.class,
+ Text.class,
+ VarIntWritable.class,
+ SequenceFileOutputFormat.class);
+ createFromIdDictionary.getConfiguration().set(EmailUtility.SEPARATOR, separator);
+ createFromIdDictionary.waitForCompletion(true);
+ //write out the dictionary at the top level
+ fromChunks = createDictionaryChunks(fromIdsPath, output, "fromIds-dictionary-", createFromIdDictionary.getConfiguration(), chunkSize, fromDim);
+ }
+ //OK, we have our dictionaries, let's output the real thing we need: <from_id -> <msgId, msgId, msgId, ...>>
+ if (shouldRunNextPhase(parsedArgs, currentPhase) && fromChunks != null && msgIdChunks != null) {
+ //Job map
+ //may be a way to do this so that we can load the from ids in memory, if they are small enough so that we don't need the double loop
+ log.info("Creating recommendation matrix");
+ int i = 0, j = 0;
+ Path vecPath = new Path(output, "recInput");
+ if (overwrite) {
+ HadoopUtil.delete(conf, vecPath);
+ }
+ //conf.set(EmailUtility.FROM_DIMENSION, String.valueOf(fromDim[0]));
+ conf.set(EmailUtility.MSG_ID_DIMENSION, String.valueOf(msgDim[0]));
+ conf.set(EmailUtility.FROM_PREFIX, "fromIds-dictionary-");
+ conf.set(EmailUtility.MSG_IDS_PREFIX, "msgIds-dictionary-");
+ conf.set(EmailUtility.SEPARATOR, separator);
+ for (Path fromChunk : fromChunks) {
+ for (Path idChunk : msgIdChunks) {
+ Path out = new Path(vecPath, "tmp-" + i + "-" + j);
+ DistributedCache.setCacheFiles(new URI[]{fromChunk.toUri(), idChunk.toUri()}, conf);
+ Job createRecMatrix = prepareJob(input, out, SequenceFileInputFormat.class,
+ MailToRecMapper.class, NullWritable.class, Text.class,
+ TextOutputFormat.class);
+ createRecMatrix.getConfiguration().set("mapred.output.compress", "false");
+ createRecMatrix.waitForCompletion(true);
+ //copy the results up a level
+ //HadoopUtil.copyMergeSeqFiles(out.getFileSystem(conf), out, vecPath.getFileSystem(conf), outPath, true, conf, "");
+ FileStatus fs[] = HadoopUtil.getFileStatus(new Path(out, "*"), PathType.GLOB, PathFilters.partFilter(), null, conf);
+ for (int k = 0; k < fs.length; k++) {
+ FileStatus f = fs[k];
+ Path outPath = new Path(vecPath, "chunk-" + i + "-" + j + "-" + k);
+ FileUtil.copy(f.getPath().getFileSystem(conf), f.getPath(), outPath.getFileSystem(conf), outPath, true, overwrite, conf);
+ }
+ HadoopUtil.delete(conf, out);
+ j++;
+ }
+ i++;
+ }
+ //concat the files together
+ /*Path mergePath = new Path(output, "vectors.dat");
+ if (overwrite) {
+ HadoopUtil.delete(conf, mergePath);
+ }
+ log.info("Merging together output vectors to vectors.dat in {}", output);*/
+ //HadoopUtil.copyMergeSeqFiles(vecPath.getFileSystem(conf), vecPath, mergePath.getFileSystem(conf), mergePath, false, conf, "\n");
+ }
+
+ return result;
+ }
+
+ private static List<Path> createDictionaryChunks(Path inputPath,
+ Path dictionaryPathBase,
+ String name,
+ Configuration baseConf,
+ int chunkSizeInMegabytes, int[] maxTermDimension) throws IOException {
+ List<Path> chunkPaths = Lists.newArrayList();
+
+ Configuration conf = new Configuration(baseConf);
+
+ FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+ long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
+ int chunkIndex = 0;
+ Path chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+ chunkPaths.add(chunkPath);
+
+ SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+
+ try {
+ long currentChunkSize = 0;
+ Path filesPattern = new Path(inputPath, OUTPUT_FILES_PATTERN);
+ int i = 0;
+ for (Pair<Writable, Writable> record
+ : new SequenceFileDirIterable<Writable, Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {
+ if (currentChunkSize > chunkSizeLimit) {
+ Closeables.closeQuietly(dictWriter);
+ chunkIndex++;
+
+ chunkPath = new Path(dictionaryPathBase, name + chunkIndex);
+ chunkPaths.add(chunkPath);
+
+ dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
+ currentChunkSize = 0;
+ }
+
+ Writable key = record.getFirst();
+ int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
+ currentChunkSize += fieldSize;
+ dictWriter.append(key, new IntWritable(i++));
+ }
+ maxTermDimension[0] = i;
+ } finally {
+ Closeables.closeQuietly(dictWriter);
+ }
+
+ return chunkPaths;
+ }
+
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,99 @@
+package org.apache.mahout.cf.taste.example.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ *
+ **/
+public class MailToRecMapper extends
+ Mapper<Text, Text, NullWritable, Text> {
+ private transient static Logger log = LoggerFactory.getLogger(MailToRecMapper.class);
+ private OpenObjectIntHashMap<String> fromDictionary = new OpenObjectIntHashMap<String>();
+ private OpenObjectIntHashMap<String> msgIdDictionary = new OpenObjectIntHashMap<String>();
+ private String separator = "\n";
+
+ public enum Counters {
+ REFERENCE, ORIGINAL
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String fromPrefix = conf.get(EmailUtility.FROM_PREFIX);
+ String msgPrefix = conf.get(EmailUtility.MSG_IDS_PREFIX);
+ EmailUtility.loadDictionaries(conf, fromPrefix, fromDictionary, msgPrefix, msgIdDictionary);
+ log.info("From Dictionary size: {} Msg Id Dictionary size: {}", fromDictionary.size(), msgIdDictionary.size());
+ separator = context.getConfiguration().get(EmailUtility.SEPARATOR);
+ }
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+
+ String msgId = null;
+ int msgIdKey = Integer.MIN_VALUE;
+
+
+ int fromKey = Integer.MIN_VALUE;
+ String valStr = value.toString();
+ String[] splits = StringUtils.splitByWholeSeparatorPreserveAllTokens(valStr, separator);
+ //format is: from, to, refs, subject, body
+
+ if (splits != null && splits.length > 0) {
+ String from = EmailUtility.cleanUpEmailAddress(splits[0]);
+ fromKey = fromDictionary.get(from);
+ //get the references
+ if (splits.length > 2) {
+ String[] theRefs = EmailUtility.parseReferences(splits[2]);
+ if (theRefs != null && theRefs.length > 0) {
+ //we have a reference, the first one is the original message id, so map to that one if it exists
+ msgIdKey = msgIdDictionary.get(theRefs[0]);
+ context.getCounter(Counters.REFERENCE).increment(1);
+ }
+ }
+ }
+ if (msgIdKey == Integer.MIN_VALUE) {//we don't have any references, so use the msg id
+ //get the msg id and the from and output the associated ids
+ String keyStr = key.toString();
+ int idx = keyStr.lastIndexOf("/");
+ if (idx != -1) {
+ msgId = keyStr.substring(idx + 1);
+ msgIdKey = msgIdDictionary.get(msgId);
+ context.getCounter(Counters.ORIGINAL).increment(1);
+ }
+ }
+
+ if (msgIdKey != Integer.MIN_VALUE && fromKey != Integer.MIN_VALUE) {
+ context.write(null, new Text(fromKey + "," + msgIdKey + ",1"));
+ }
+ }
+
+
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,32 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+
+import java.io.IOException;
+
+/**
+ * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives}
+ */
+public class MsgIdToDictionaryMapper extends
+ Mapper<Text, Text, Text, VarIntWritable> {
+ public enum Counters {
+ NO_MESSAGE_ID
+ }
+
+ @Override
+ protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+ //message id is in the key: /201008/AANLkTikvVnhNH+Y5AGEwqd2=u0CFv2mCm0ce6E6oBnj1@mail.gmail.com
+ String keyStr = key.toString();
+ int idx = keyStr.lastIndexOf("/");
+ String msgId = null;
+ if (idx != -1) {
+ msgId = keyStr.substring(idx + 1);
+ context.write(new Text(msgId), new VarIntWritable(1));
+ } else {
+ context.getCounter(Counters.NO_MESSAGE_ID).increment(1);
+ }
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailMapper.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,52 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Convert the labels created by the {@link org.apache.mahout.utils.email.MailProcessor} to one consumable by the classifiers
+ */
+public class PrepEmailMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
+ private boolean useListName = false;//if true, use the project name and the list name in label creation
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ useListName = Boolean.parseBoolean(context.getConfiguration().get(PrepEmailVectorsDriver.USE_LIST_NAME));
+ }
+
+ @Override
+ protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException, InterruptedException {
+ String input = key.toString();
+ ///Example: /cocoon.apache.org/dev/200307.gz/001401c3414f$8394e160$1e01a8c0@WRPO
+ String[] splits = input.split("\\/");
+ //we need the first two splits;
+ if (splits.length >= 3) {
+ StringBuilder bldr = new StringBuilder(splits[1].replaceAll("-|\\.", "_").toLowerCase());
+ if (useListName == true) {
+ bldr.append("_").append(splits[2].replaceAll("-|\\.", "_").toLowerCase());
+ }
+ context.write(new Text(bldr.toString()), value);
+ }
+
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailReducer.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,48 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ *
+ *
+ **/
+public class PrepEmailReducer extends Reducer<Text, VectorWritable, Text, VectorWritable>{
+ long maxItemsPerLabel = 10000;
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ maxItemsPerLabel = Long.parseLong(context.getConfiguration().get(PrepEmailVectorsDriver.ITEMS_PER_CLASS));
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException {
+ //TODO: support randomization? Likely not needed due to the SplitInput utility which does random selection
+ long i = 0;
+ Iterator<VectorWritable> iterator = values.iterator();
+ while (i < maxItemsPerLabel && iterator.hasNext()){
+ context.write(key, iterator.next());
+ i++;
+ }
+ }
+}
Added: mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java (added)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/email/PrepEmailVectorsDriver.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,73 @@
+package org.apache.mahout.classifier.email;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.Map;
+
+/**
+ * Convert the labels generated by {@link org.apache.mahout.text.SequenceFilesFromMailArchives} and
+ * {@link org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles} to ones consumable by the classifiers. We do this
+ * here b/c if it is done in the creation of sparse vectors, the Reducer collapses all the vectors.
+ */
+public class PrepEmailVectorsDriver extends AbstractJob {
+
+ public static final String ITEMS_PER_CLASS = "itemsPerClass";
+ public static final String USE_LIST_NAME = "USE_LIST_NAME";
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new PrepEmailVectorsDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int result = 0;
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption("maxItemsPerLabel", "mipl", "The maximum number of items per label. Can be useful for making the training sets the same size", String.valueOf(100000));
+ addOption(buildOption("useListName", "ul", "Use the name of the list as part of the label. If not set, then just use the project name", false, false, "false"));
+ Map<String,String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+ Job convertJob = prepareJob(input, output, SequenceFileInputFormat.class, PrepEmailMapper.class,
+ Text.class, VectorWritable.class, PrepEmailReducer.class, Text.class, VectorWritable.class, SequenceFileOutputFormat.class);
+ convertJob.getConfiguration().set(ITEMS_PER_CLASS, parsedArgs.get("--maxItemsPerLabel"));
+ convertJob.getConfiguration().set(USE_LIST_NAME, String.valueOf(parsedArgs.containsKey("--useListName")));
+ convertJob.waitForCompletion(true);
+ return result;
+ }
+}
Added: mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java?rev=1180043&view=auto
==============================================================================
--- mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java (added)
+++ mahout/trunk/examples/src/test/java/org/apache/mahout/cf/taste/example/email/MailToPrefsTest.java Fri Oct 7 14:02:20 2011
@@ -0,0 +1,16 @@
+package org.apache.mahout.cf.taste.example.email;
+
+
+import org.apache.mahout.examples.MahoutTestCase;
+
+/**
+ *
+ *
+ **/
+public class MailToPrefsTest extends MahoutTestCase{
+
+ public void test() throws Exception {
+
+ }
+
+}