You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/01/13 02:11:59 UTC

svn commit: r898594 - in /lucene/mahout/trunk: examples/src/main/java/org/apache/mahout/text/ utils/src/main/java/org/apache/mahout/utils/vectors/text/ utils/src/test/java/org/apache/mahout/utils/vectors/text/

Author: srowen
Date: Wed Jan 13 01:11:59 2010
New Revision: 898594

URL: http://svn.apache.org/viewvc?rev=898594&view=rev
Log:
MAHOUT-237

Added:
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java
    lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/
    lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java

Added: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (added)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.FileLineIterable;
+
+/**
+ * Converts a directory of text documents into SequenceFiles of Specified
+ * chunkSize. This class takes in a parent directory containing sub folders of
+ * text documents and recursively reads the files and creates the
+ * {@link SequenceFile}s of docid => content. The docid is set as the relative
+ * path of the document from the parent directory prepended with a specified
+ * prefix. You can also specify the input encoding of the text files. The
+ * content of the output SequenceFiles are encoded as UTF-8 text.
+ * 
+ * 
+ */
+public final class SequenceFilesFromDirectory {
+  
+  private ChunkedWriter createNewChunkedWriter(int chunkSizeInMB,
+                                               String outputDir) throws IOException {
+    return new ChunkedWriter(chunkSizeInMB, outputDir);
+  }
+  
+  public void createSequenceFiles(File parentDir,
+                                  String outputDir,
+                                  String prefix,
+                                  int chunkSizeInMB,
+                                  Charset charset) throws IOException {
+    ChunkedWriter writer = createNewChunkedWriter(chunkSizeInMB, outputDir);
+    parentDir.listFiles(new PrefixAdditionFilter(prefix, writer, charset));
+    writer.close();
+  }
+  
+  public class ChunkedWriter implements Closeable {
+    int maxChunkSizeInBytes;
+    String outputDir;
+    SequenceFile.Writer writer;
+    int currentChunkID;
+    int currentChunkSize;
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    
+    public ChunkedWriter(int chunkSizeInMB, String outputDir) throws IOException {
+      if (chunkSizeInMB < 64) {
+        chunkSizeInMB = 64;
+      } else if (chunkSizeInMB > 1984) {
+        chunkSizeInMB = 1984;
+      }
+      maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+      this.outputDir = outputDir;
+      fs = FileSystem.get(conf);
+      writer =
+          new SequenceFile.Writer(fs, conf, getPath(currentChunkID),
+              Text.class, Text.class);
+    }
+    
+    private Path getPath(int chunkID) {
+      return new Path(outputDir + "/chunk-" + chunkID);
+    }
+    
+    public void write(String key, String value) throws IOException {
+      if (currentChunkSize > maxChunkSizeInBytes) {
+        writer.close();
+        writer =
+            new SequenceFile.Writer(fs, conf, getPath(currentChunkID++),
+                Text.class, Text.class);
+        currentChunkSize = 0;
+        
+      }
+      
+      Text keyT = new Text(key);
+      Text valueT = new Text(value);
+      currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // Overhead
+      writer.append(keyT, valueT);
+    }
+    
+    @Override
+    public void close() throws IOException {
+      writer.close();
+    }
+  }
+  
+  public class PrefixAdditionFilter implements FileFilter {
+    String prefix;
+    ChunkedWriter writer;
+    Charset charset;
+    
+    public PrefixAdditionFilter(String prefix,
+                                ChunkedWriter writer,
+                                Charset charset) {
+      this.prefix = prefix;
+      this.writer = writer;
+      this.charset = charset;
+    }
+    
+    @Override
+    public boolean accept(File current) {
+      if (current.isDirectory()) {
+        current.listFiles(new PrefixAdditionFilter(prefix
+            + File.separator
+            + current.getName(), writer, charset));
+      } else {
+        try {
+          FileLineIterable fit = new FileLineIterable(current, charset, false);
+          StringBuilder file = new StringBuilder();
+          Iterator<String> it = fit.iterator();
+          while (it.hasNext()) {
+            file.append(it.next()).append("\n");
+          }
+          writer.write(prefix + File.separator + current.getName(), file
+              .toString());
+          
+        } catch (FileNotFoundException e) {
+          // Skip file.
+        } catch (IOException e) {
+          // TODO: report exceptions and continue;
+          throw new IllegalStateException(e);
+        }
+      }
+      return false;
+    }
+    
+  }
+  
+  public static void main(String[] args) throws Exception {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option parentOpt =
+        obuilder.withLongName("parent").withRequired(true).withArgument(
+            abuilder.withName("parent").withMinimum(1).withMaximum(1).create())
+            .withDescription("Parent dir containing the documents")
+            .withShortName("p").create();
+    
+    Option outputDirOpt =
+        obuilder.withLongName("outputDir").withRequired(true).withArgument(
+            abuilder.withName("outputDir").withMinimum(1).withMaximum(1)
+                .create()).withDescription("The output directory")
+            .withShortName("o").create();
+    
+    Option chunkSizeOpt =
+        obuilder.withLongName("chunkSize").withArgument(
+            abuilder.withName("chunkSize").withMinimum(1).withMaximum(1)
+                .create()).withDescription(
+            "The chunkSize in MegaBytes. Defaults to 64")
+            .withShortName("chunk").create();
+    
+    Option keyPrefixOpt =
+        obuilder.withLongName("keyPrefix").withArgument(
+            abuilder.withName("keyPrefix").withMinimum(1).withMaximum(1)
+                .create()).withDescription(
+            "The prefix to be prepended to the key").withShortName("prefix")
+            .create();
+    
+    Option charsetOpt =
+        obuilder.withLongName("charset").withRequired(true)
+            .withArgument(
+                abuilder.withName("charset").withMinimum(1).withMaximum(1)
+                    .create()).withDescription(
+                "The name of the character encoding of the input files")
+            .withShortName("c").create();
+    
+    Group group =
+        gbuilder.withName("Options").withOption(keyPrefixOpt).withOption(
+            chunkSizeOpt).withOption(charsetOpt).withOption(outputDirOpt)
+            .withOption(parentOpt).create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    CommandLine cmdLine = parser.parse(args);
+    
+    File parentDir = new File((String) cmdLine.getValue(parentOpt));
+    String outputDir = (String) cmdLine.getValue(outputDirOpt);
+    
+    int chunkSize = 64;
+    if (cmdLine.hasOption(chunkSizeOpt)) {
+      chunkSize =
+          Integer.valueOf((String) cmdLine.getValue(chunkSizeOpt)).intValue();
+    }
+    
+    String prefix = "";
+    if (cmdLine.hasOption(keyPrefixOpt)) {
+      prefix = (String) cmdLine.getValue(keyPrefixOpt);
+    }
+    Charset charset = Charset.forName((String) cmdLine.getValue(charsetOpt));
+    SequenceFilesFromDirectory dir = new SequenceFilesFromDirectory();
+    
+    dir.createSequenceFiles(parentDir, outputDir, prefix, chunkSize, charset);
+  }
+}

Added: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java (added)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/SparseVectorsFromSequenceFiles.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.utils.vectors.text.DictionaryVectorizer;
+
+/**
+ * Converts a given set of sequence files into SparseVectors
+ * 
+ */
+public final class SparseVectorsFromSequenceFiles {
+  
+  public static void main(String[] args) throws Exception {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option inputDirOpt =
+        obuilder.withLongName("input").withRequired(true).withArgument(
+            abuilder.withName("input").withMinimum(1).withMaximum(1).create())
+            .withDescription(
+                "input dir containing the documents in sequence file format")
+            .withShortName("i").create();
+    
+    Option outputDirOpt =
+        obuilder.withLongName("outputDir").withRequired(true).withArgument(
+            abuilder.withName("outputDir").withMinimum(1).withMaximum(1)
+                .create()).withDescription("The output directory")
+            .withShortName("o").create();
+    Option minSupportOpt =
+        obuilder.withLongName("minSupport").withArgument(
+            abuilder.withName("minSupport").withMinimum(1).withMaximum(1)
+                .create()).withDescription(
+            "(Optional) Minimum Support. Default Value: 2").withShortName("s")
+            .create();
+    
+    Option analyzerNameOpt =
+        obuilder.withLongName("analyzerName").withRequired(true).withArgument(
+            abuilder.withName("analyzerName").withMinimum(1).withMaximum(1)
+                .create()).withDescription("The class name of the analyzer")
+            .withShortName("a").create();
+    
+    Option chunkSizeOpt =
+        obuilder.withLongName("chunkSize").withArgument(
+            abuilder.withName("chunkSize").withMinimum(1).withMaximum(1)
+                .create()).withDescription(
+            "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk")
+            .create();
+    
+    Group group =
+        gbuilder.withName("Options").withOption(minSupportOpt).withOption(
+            analyzerNameOpt).withOption(chunkSizeOpt).withOption(outputDirOpt)
+            .withOption(inputDirOpt).create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    CommandLine cmdLine = parser.parse(args);
+    
+    String inputDir = (String) cmdLine.getValue(inputDirOpt);
+    String outputDir = (String) cmdLine.getValue(outputDirOpt);
+    
+    int chunkSize = 100;
+    if (cmdLine.hasOption(chunkSizeOpt)) {
+      chunkSize =
+          Integer.valueOf((String) cmdLine.getValue(chunkSizeOpt)).intValue();
+    }
+    int minSupport = 2;
+    if (cmdLine.hasOption(minSupportOpt)) {
+      String minSupportString = (String) cmdLine.getValue(minSupportOpt);
+      minSupport = Integer.parseInt(minSupportString);
+    }
+    String analyzerName = (String) cmdLine.getValue(analyzerNameOpt);
+    Analyzer analyzer = (Analyzer) Class.forName(analyzerName).newInstance();
+    
+    DictionaryVectorizer.createTermFrequencyVectors(inputDir, outputDir,
+        analyzer, minSupport, chunkSize);
+  }
+}

Added: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java (added)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaMapper.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category
+ * listed in the input category file
+ * 
+ */
+public class WikipediaMapper extends MapReduceBase implements
+    Mapper<LongWritable,Text,Text,Text> {
+  
+  private static final Logger log =
+      LoggerFactory.getLogger(WikipediaMapper.class);
+  private static final Pattern SPACE_NON_ALPHA_PATTERN =
+      Pattern.compile("[\\s\\W]");
+  
+  private static final Pattern DOCUMENT =
+      Pattern.compile("<text xml:space=\"preserve\">(.*)</text>");
+  private static final Pattern TITLE = Pattern.compile("<title>(.*)</title>");
+  
+  private Set<String> inputCategories;
+  private boolean exactMatchOnly;
+  private boolean all;
+  private Analyzer analyzer;
+  
+  @Override
+  public void map(LongWritable key,
+                  Text value,
+                  OutputCollector<Text,Text> output,
+                  Reporter reporter) throws IOException {
+    
+    String content = value.toString();
+    String document = getDocument(content);
+    String title = getTitle(content);
+    
+    if (!all) {
+      String catMatch = findMatchingCategory(document);
+      if (catMatch.equals("Unknown")) return;
+    }
+    document = StringEscapeUtils.unescapeHtml(document);
+    
+    output.collect(new Text(SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll(
+        "_")), new Text(document));
+    
+  }
+  
+  private String getDocument(String xml) {
+    Matcher m = DOCUMENT.matcher(xml);
+    String ret = "";
+    if (m.find()) {
+      ret = m.group(1);
+    }
+    return ret;
+    
+  }
+  
+  private String getTitle(String xml) {
+    Matcher m = TITLE.matcher(xml);
+    String ret = "";
+    if (m.find()) {
+      ret = m.group(1);
+    }
+    return ret;
+  }
+  
+  private String findMatchingCategory(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+      categoryIndex += 11;
+      int endIndex = document.indexOf("]]", categoryIndex);
+      if (endIndex >= document.length() || endIndex < 0) {
+        break;
+      }
+      String category =
+          document.substring(categoryIndex, endIndex).toLowerCase().trim();
+      // categories.add(category.toLowerCase());
+      if (exactMatchOnly && inputCategories.contains(category)) {
+        return category;
+      } else if (exactMatchOnly == false) {
+        for (String inputCategory : inputCategories) {
+          if (category.contains(inputCategory)) { // we have an inexact match
+            return inputCategory;
+          }
+        }
+      }
+      startIndex = endIndex;
+    }
+    return "Unknown";
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    try {
+      if (inputCategories == null) {
+        Set<String> newCategories = new HashSet<String>();
+        
+        DefaultStringifier<Set<String>> setStringifier =
+            new DefaultStringifier<Set<String>>(job, GenericsUtil
+                .getClass(newCategories));
+        
+        String categoriesStr = setStringifier.toString(newCategories);
+        categoriesStr = job.get("wikipedia.categories", categoriesStr);
+        inputCategories = setStringifier.fromString(categoriesStr);
+        
+      }
+      exactMatchOnly = job.getBoolean("exact.match.only", false);
+      all = job.getBoolean("all.files", true);
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+    log.info("Configure: Input Categories size: "
+        + inputCategories.size()
+        + " All: "
+        + all
+        + " Exact Match: "
+        + exactMatchOnly
+        + " Analyzer: "
+        + analyzer.getClass().getName());
+  }
+}

Added: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java (added)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.classifier.bayes.XmlInputFormat;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaToSequenceFile {
+  private static final Logger log =
+      LoggerFactory.getLogger(WikipediaToSequenceFile.class);
+  
+  private WikipediaToSequenceFile() {}
+  
+  /**
+   * Takes in two arguments:
+   * <ol>
+   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents
+   * live</li>
+   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the
+   * classifier as a {@link org.apache.hadoop.io.SequenceFile}</li>
+   * </ol>
+   * 
+   * @param args
+   *          The args
+   */
+  public static void main(String[] args) throws IOException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dirInputPathOpt =
+        obuilder.withLongName("input").withRequired(true).withArgument(
+            abuilder.withName("input").withMinimum(1).withMaximum(1).create())
+            .withDescription("The input directory path").withShortName("i")
+            .create();
+    
+    Option dirOutputPathOpt =
+        obuilder.withLongName("output").withRequired(true).withArgument(
+            abuilder.withName("output").withMinimum(1).withMaximum(1).create())
+            .withDescription("The output directory Path").withShortName("o")
+            .create();
+    
+    Option categoriesOpt =
+        obuilder
+            .withLongName("categories")
+            .withArgument(
+                abuilder.withName("categories").withMinimum(1).withMaximum(1)
+                    .create())
+            .withDescription(
+                "Location of the categories file.  One entry per line. "
+                    + "Will be used to make a string match in Wikipedia Category field")
+            .withShortName("c").create();
+    
+    Option exactMatchOpt =
+        obuilder.withLongName("exactMatch").withDescription(
+            "If set, then the category name must exactly match the "
+                + "entry in the categories file. Default is false")
+            .withShortName("e").create();
+    
+    Option allOpt =
+        obuilder.withLongName("all").withDescription(
+            "If set, Select all files. Default is true").withShortName("all")
+            .create();
+    
+    Option helpOpt =
+        obuilder.withLongName("help").withDescription("Print out help")
+            .withShortName("h").create();
+    
+    Group group =
+        gbuilder.withName("Options").withOption(categoriesOpt).withOption(
+            dirInputPathOpt).withOption(dirOutputPathOpt).withOption(
+            exactMatchOpt).withOption(allOpt).withOption(helpOpt).create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    try {
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+      String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+      
+      String catFile = "";
+      if (cmdLine.hasOption(categoriesOpt)) {
+        catFile = (String) cmdLine.getValue(categoriesOpt);
+      }
+      
+      boolean all = true;
+      if (cmdLine.hasOption(allOpt)) {
+        cmdLine.getValue(allOpt);
+      }
+      runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt),
+          all);
+    } catch (OptionException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+  
+  /**
+   * Run the job
+   * 
+   * @param input
+   *          the input pathname String
+   * @param output
+   *          the output pathname String
+   * @param catFile
+   *          the file containing the Wikipedia categories
+   * @param exactMatchOnly
+   *          if true, then the Wikipedia category must match exactly instead of
+   *          simply containing the category string
+   * @param all
+   *          if true select all categories
+   */
+  public static void runJob(String input,
+                            String output,
+                            String catFile,
+                            boolean exactMatchOnly,
+                            boolean all) throws IOException {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(WikipediaToSequenceFile.class);
+    if (log.isInfoEnabled()) {
+      log.info("Input: "
+          + input
+          + " Out: "
+          + output
+          + " Categories: "
+          + catFile
+          + " All Files: "
+          + all);
+    }
+    conf.set("xmlinput.start", "<page");
+    conf.set("xmlinput.end", "</page>");
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setBoolean("exact.match.only", exactMatchOnly);
+    conf.setBoolean("all.files", all);
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outPath);
+    conf.setMapperClass(WikipediaMapper.class);
+    conf.setInputFormat(XmlInputFormat.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization,"
+            + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    
+    Set<String> categories = new HashSet<String>();
+    if (catFile.equals("") == false) {
+      for (String line : new FileLineIterable(new File(catFile))) {
+        categories.add(line.trim().toLowerCase());
+      }
+    }
+    
+    DefaultStringifier<Set<String>> setStringifier =
+        new DefaultStringifier<Set<String>>(conf, GenericsUtil
+            .getClass(categories));
+    
+    String categoriesStr = setStringifier.toString(categories);
+    
+    conf.set("wikipedia.categories", categoriesStr);
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizer.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.math.SparseVector;
+
+/**
+ * This class converts a set of input documents in the sequence file format to
+ * vectors. The Sequence file input should have a {@link Text} key containing
+ * the unique document identifier and a {@link Text} value containing the whole
+ * document. The document should be stored in UTF-8 encoding which is
+ * recognizable by hadoop. It uses the given {@link Analyzer} to process the
+ * document into {@link org.apache.lucene.analysis.Token}s. This is a dictionary
+ * based Vectorizer.
+ * 
+ */
+public final class DictionaryVectorizer {
+  
+  public static final String ANALYZER_CLASS = "AnalyzerClass";
+  
+  public static final Charset CHARSET = Charset.forName("UTF-8");
+  
+  private static final String DICTIONARY_FILE = "/dictionary.file-";
+  
+  private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "/vectors";
+  
+  private static final String FREQUENCY_FILE = "/frequency.file-";
+  
+  private static final int MAX_CHUNKSIZE = 10000;
+  
+  private static final int MIN_CHUNKSIZE = 100;
+  
+  private static final String OUTPUT_FILES_PATTERN = "/part-*";
+  
+  private static final int SEQUENCEFILE_BYTE_OVERHEAD = 4;
+  
+  private static final String VECTOR_OUTPUT_FOLDER = "/partial-vectors-";
+  
+  private static final String WORDCOUNT_OUTPUT_FOLDER = "/wordcount";
+  
+  /**
+   * Cannot be initialized. Use the static functions
+   */
+  private DictionaryVectorizer() {
+
+  }
+  
+  /**
+   * Create Term Frequency (Tf) Vectors from the input set of documents in
+   * {@link SequenceFile} format. This tries to fix the maximum memory used by
+   * the feature chunk per node thereby splitting the process across multiple
+   * map/reduces.
+   * 
+   * @param input
+   *          input directory of the documents in {@link SequenceFile} format
+   * @param output
+   *          output directory where {@link SparseVector}'s of the document are
+   *          generated
+   * @param analyzer
+   *          the Lucene {@link Analyzer} used to tokenize the UTF-8
+   * @param minSupport
+   *          the minimum frequency of the feature in the entire corpus to be
+   *          considered for inclusion in the SparseVector
+   * @param chunkSizeInMegabytes
+   *          the size in MB of the feature => id chunk to be kept in memory at
+   *          each node during Map/Reduce stage. Its recommended you calculated
+   *          this based on the number of cores and the free memory available to
+   *          you per node. Say, you have 2 cores and around 1GB extra memory to
+   *          spare we recommend you use a split size of around 400-500MB so
+   *          that two simultaneous reducers can create partial vectors without
+   *          thrashing the system due to increased swapping
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws URISyntaxException
+   */
+  public static void createTermFrequencyVectors(String input,
+                                                String output,
+                                                Analyzer analyzer,
+                                                int minSupport,
+                                                int chunkSizeInMegabytes) throws IOException,
+                                                                         InterruptedException,
+                                                                         ClassNotFoundException,
+                                                                         URISyntaxException {
+    if (chunkSizeInMegabytes < MIN_CHUNKSIZE) {
+      chunkSizeInMegabytes = MIN_CHUNKSIZE;
+    } else if (chunkSizeInMegabytes > MAX_CHUNKSIZE) { // 10GB
+      chunkSizeInMegabytes = MAX_CHUNKSIZE;
+    }
+    
+    Path inputPath = new Path(input);
+    Path wordCountPath = new Path(output + WORDCOUNT_OUTPUT_FOLDER);
+    
+    startWordCounting(inputPath, wordCountPath, analyzer);
+    List<Path> dictionaryChunks =
+        createDictionaryChunks(minSupport, wordCountPath, output,
+            chunkSizeInMegabytes);
+    
+    int partialVectorIndex = 0;
+    List<Path> partialVectorPaths = new ArrayList<Path>();
+    for (Path dictionaryChunk : dictionaryChunks) {
+      Path partialVectorOutputPath =
+          getPath(output + VECTOR_OUTPUT_FOLDER, partialVectorIndex++);
+      partialVectorPaths.add(partialVectorOutputPath);
+      makePartialVectors(input, dictionaryChunk, partialVectorOutputPath,
+          analyzer);
+    }
+    
+    createVectorFromPartialVectors(partialVectorPaths, output
+        + DOCUMENT_VECTOR_OUTPUT_FOLDER);
+  }
+  
+  /**
+   * Read the feature frequency List which is built at the end of the Word Count
+   * Job and assign ids to them. This will use constant memory and will run at
+   * the speed of your disk read
+   * 
+   * @param minSupport
+   * @param filePath
+   * @param dictionaryPath
+   * @throws IOException
+   */
+  private static List<Path> createDictionaryChunks(int minSupport,
+                                                   Path wordCountPath,
+                                                   String dictionaryPathBase,
+                                                   int chunkSizeInMegabytes) throws IOException {
+    List<Path> chunkPaths = new ArrayList<Path>();
+    
+    Writable key = new Text();
+    LongWritable value = new LongWritable();
+    Configuration conf = new Configuration();
+    
+    FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);
+    FileStatus[] outputFiles =
+        fs
+            .globStatus(new Path(wordCountPath.toString()
+                + OUTPUT_FILES_PATTERN));
+    
+    long i = 0;
+    long chunkSizeLimit = chunkSizeInMegabytes * 1024 * 1024;
+    int chunkIndex = 0;
+    Path chunkPath = getPath(dictionaryPathBase + DICTIONARY_FILE, chunkIndex);
+    chunkPaths.add(chunkPath);
+    
+    SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, conf, chunkPath, Text.class,
+            LongWritable.class);
+    
+    SequenceFile.Writer freqWriter =
+        new SequenceFile.Writer(fs, conf, getPath(dictionaryPathBase
+            + FREQUENCY_FILE, chunkIndex), Text.class, LongWritable.class);
+    
+    long currentChunkSize = 0;
+    
+    for (FileStatus fileStatus : outputFiles) {
+      Path path = fileStatus.getPath();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      // key is feature value is count
+      while (reader.next(key, value)) {
+        if (value.get() < minSupport) {
+          continue;
+        }
+        
+        if (currentChunkSize > chunkSizeLimit) {
+          writer.close();
+          freqWriter.close();
+          chunkIndex++;
+          
+          chunkPath = getPath(dictionaryPathBase + DICTIONARY_FILE, chunkIndex);
+          chunkPaths.add(chunkPath);
+          
+          writer =
+              new SequenceFile.Writer(fs, conf, chunkPath, Text.class,
+                  LongWritable.class);
+          freqWriter =
+              new SequenceFile.Writer(fs, conf, getPath(dictionaryPathBase
+                  + FREQUENCY_FILE, chunkIndex), Text.class, LongWritable.class);
+          currentChunkSize = 0;
+        }
+        
+        int fieldSize =
+            SEQUENCEFILE_BYTE_OVERHEAD
+                + key.toString().getBytes(CHARSET).length
+                + Long.SIZE
+                / 8;
+        currentChunkSize += fieldSize;
+        writer.append(key, new LongWritable(i++));
+        freqWriter.append(key, value);
+      }
+    }
+    
+    writer.close();
+    freqWriter.close();
+    
+    return chunkPaths;
+  }
+  
+  /**
+   * Merge all the partial {@link SparseVector}s into the complete Document
+   * {@link SparseVector}
+   * 
+   * @param partialVectorPaths
+   *          input directory of the documents in {@link SequenceFile} format
+   * @param output
+   *          output directory were the partial vectors have to be created
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws URISyntaxException
+   */
+  private static void createVectorFromPartialVectors(List<Path> partialVectorPaths,
+                                                     String output) throws IOException,
+                                                                   InterruptedException,
+                                                                   ClassNotFoundException,
+                                                                   URISyntaxException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(DictionaryVectorizer.class);
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization,"
+            + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.set(ANALYZER_CLASS, StandardAnalyzer.class.getName());
+    conf
+        .setJobName("DictionaryVectorizer Vector generator to group Partial Vectors");
+    
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(SparseVector.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(SparseVector.class);
+    
+    FileInputFormat.setInputPaths(conf,
+        getCommaSeparatedPaths(partialVectorPaths));
+    
+    Path outputPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outputPath);
+    
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setReducerClass(PartialVectorMerger.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileSystem dfs = FileSystem.get(outputPath.toUri(), conf);
+    if (dfs.exists(outputPath)) {
+      dfs.delete(outputPath, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+  
+  private static String getCommaSeparatedPaths(List<Path> paths) {
+    StringBuilder commaSeparatedPaths = new StringBuilder();
+    String sep = "";
+    for (Path path : paths) {
+      commaSeparatedPaths.append(sep).append(path.toString());
+      sep = ",";
+    }
+    return commaSeparatedPaths.toString();
+  }
+  
+  public static Path getPath(String basePath, int index) {
+    return new Path(basePath + index);
+  }
+  
+  /**
+   * Create a partial vector using a chunk of features from the input documents.
+   * The input documents has to be in the {@link SequenceFile} format
+   * 
+   * @param input
+   *          input directory of the documents in {@link SequenceFile} format
+   * @param dictionaryFilePath
+   *          location of the chunk of features and the id's
+   * @param output
+   *          output directory were the partial vectors have to be created
+   * @param analyzer
+   *          The Lucene {@link Analyzer} for tokenizing the text
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws URISyntaxException
+   */
+  private static void makePartialVectors(String input,
+                                         Path dictionaryFilePath,
+                                         Path output,
+                                         Analyzer analyzer) throws IOException,
+                                                           InterruptedException,
+                                                           ClassNotFoundException,
+                                                           URISyntaxException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(DictionaryVectorizer.class);
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization,"
+            + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.set(ANALYZER_CLASS, StandardAnalyzer.class.getName());
+    conf.setJobName("DictionaryVectorizer Partial Vector running over input: "
+        + input
+        + " using dictionary file"
+        + dictionaryFilePath.toString());
+    
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(Text.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(SparseVector.class);
+    DistributedCache
+        .setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    
+    FileOutputFormat.setOutputPath(conf, output);
+    
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setReducerClass(PartialVectorGenerator.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileSystem dfs = FileSystem.get(output.toUri(), conf);
+    if (dfs.exists(output)) {
+      dfs.delete(output, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+  
+  /**
+   * Count the frequencies of words in parallel using Map/Reduce. The input
+   * documents have to be in {@link SequenceFile} format
+   * 
+   * @param params
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private static void startWordCounting(Path input,
+                                        Path output,
+                                        Analyzer analyzer) throws IOException,
+                                                          InterruptedException,
+                                                          ClassNotFoundException {
+    
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(DictionaryVectorizer.class);
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization,"
+            + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // this conf parameter needs to be set enable serialisation of conf values
+    
+    conf.set(ANALYZER_CLASS, StandardAnalyzer.class.getName());
+    conf.setJobName("DictionaryVectorizer Word Count running over input: "
+        + input.toString());
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(LongWritable.class);
+    
+    FileInputFormat.setInputPaths(conf, input);
+    Path outPath = output;
+    FileOutputFormat.setOutputPath(conf, outPath);
+    
+    conf.setMapperClass(TermCountMapper.class);
+    
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setCombinerClass(TermCountReducer.class);
+    conf.setReducerClass(TermCountReducer.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    
+    client.setConf(conf);
+    JobClient.runJob(conf);
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorGenerator.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.math.SparseVector;
+
+/**
+ * Converts a document in to a SparseVector
+ */
+public class PartialVectorGenerator extends MapReduceBase implements
+    Reducer<Text,Text,Text,SparseVector> {
+  private Analyzer analyzer;
+  private Map<String,Integer> dictionary = new HashMap<String,Integer>();
+  private FileSystem fs; // local filesystem
+  private URI[] localFiles; // local filenames from the distributed cache
+  
+  @Override
+  public void reduce(Text key,
+                     Iterator<Text> values,
+                     OutputCollector<Text,SparseVector> output,
+                     Reporter reporter) throws IOException {
+    
+    if (values.hasNext()) {
+      Text value = values.next();
+      TokenStream ts =
+          analyzer.tokenStream(key.toString(), new StringReader(value
+              .toString()));
+      
+      Map<String,MutableInt> termFrequency = new HashMap<String,MutableInt>();
+      
+      Token token = new Token();
+      int count = 0;
+      while ((token = ts.next(token)) != null) {
+        String tk = new String(token.termBuffer(), 0, token.termLength());
+        if (termFrequency.containsKey(tk) == false) {
+          count += tk.length() + 1;
+          termFrequency.put(tk, new MutableInt(0));
+        }
+        termFrequency.get(tk).increment();
+      }
+      
+      SparseVector vector =
+          new SparseVector(key.toString(), Integer.MAX_VALUE, termFrequency
+              .size());
+      
+      for (Entry<String,MutableInt> pair : termFrequency.entrySet()) {
+        String tk = pair.getKey();
+        if (dictionary.containsKey(tk) == false) continue;
+        vector.setQuick(dictionary.get(tk).intValue(), pair.getValue()
+            .doubleValue());
+      }
+      assert (vector.getNumNondefaultElements() == termFrequency.size());
+      
+      output.collect(key, vector);
+    }
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl =
+          ccl.loadClass(job.get(DictionaryVectorizer.ANALYZER_CLASS,
+              StandardAnalyzer.class.getName()));
+      analyzer = (Analyzer) cl.newInstance();
+      
+      localFiles = DistributedCache.getCacheFiles(job);
+      if (localFiles == null || localFiles.length < 1) {
+        throw new IllegalArgumentException(
+            "missing paths from the DistributedCache");
+      }
+      Path dictionaryFile = new Path(localFiles[0].getPath());
+      fs = dictionaryFile.getFileSystem(job);
+      SequenceFile.Reader reader =
+          new SequenceFile.Reader(fs, dictionaryFile, job);
+      Text key = new Text();
+      LongWritable value = new LongWritable();
+      
+      // key is word value is id
+      while (reader.next(key, value)) {
+        dictionary.put(key.toString(), Long.valueOf(value.get()).intValue());
+        // System.out.println(key.toString() + "=>" + value.get());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/PartialVectorMerger.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.SparseVector;
+
+/**
+ * Converts a document in to a SparseVector
+ */
+public class PartialVectorMerger extends MapReduceBase implements
+    Reducer<Text,SparseVector,Text,SparseVector> {
+  
+  @Override
+  public void reduce(Text key,
+                     Iterator<SparseVector> values,
+                     OutputCollector<Text,SparseVector> output,
+                     Reporter reporter) throws IOException {
+    
+    SparseVector vector =
+        new SparseVector(key.toString(), Integer.MAX_VALUE, 10);
+    while (values.hasNext()) {
+      SparseVector value = values.next();
+      value.addTo(vector);
+    }
+    output.collect(key, vector);
+    
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountMapper.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * TextVectorizer Term Count Mapper. Tokenizes a text document and outputs the
+ * count of the words
+ * 
+ */
+public class TermCountMapper extends MapReduceBase implements
+    Mapper<Text,Text,Text,LongWritable> {
+  
+  private Analyzer analyzer;
+  
+  @Override
+  public void map(Text key,
+                  Text value,
+                  OutputCollector<Text,LongWritable> output,
+                  Reporter reporter) throws IOException {
+    TokenStream ts =
+        analyzer
+            .tokenStream(key.toString(), new StringReader(value.toString()));
+    
+    Token token = new Token();
+    Map<String,MutableLong> wordCount = new HashMap<String,MutableLong>();
+    while ((token = ts.next(token)) != null) {
+      char[] termBuffer = token.termBuffer();
+      String word = new String(termBuffer, 0, token.termLength());
+      if (wordCount.containsKey(word) == false) {
+        wordCount.put(word, new MutableLong(0));
+      }
+      wordCount.get(word).increment();
+    }
+    for (Entry<String,MutableLong> entry : wordCount.entrySet()) {
+      output.collect(new Text(entry.getKey()), new LongWritable(entry
+          .getValue().longValue()));
+    }
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl =
+          ccl.loadClass(job.get(DictionaryVectorizer.ANALYZER_CLASS,
+              StandardAnalyzer.class.getName()));
+      analyzer = (Analyzer) cl.newInstance();
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/TermCountReducer.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Can also be used as a local Combiner. This accumulates all the words and the
+ * weights and sums them up.
+ */
+public class TermCountReducer extends MapReduceBase implements
+    Reducer<Text,LongWritable,Text,LongWritable> {
+  
+  @Override
+  public void reduce(Text key,
+                     Iterator<LongWritable> values,
+                     OutputCollector<Text,LongWritable> output,
+                     Reporter reporter) throws IOException {
+    long sum = 0;
+    while (values.hasNext())
+      sum += values.next().get();
+    output.collect(key, new LongWritable(sum));
+    
+  }
+  
+}

Added: lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java?rev=898594&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java (added)
+++ lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/utils/vectors/text/DictionaryVectorizerTest.java Wed Jan 13 01:11:59 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * Test the dictionary Vector
+ * 
+ */
+public class DictionaryVectorizerTest extends TestCase {
+  
+  public static final int AVG_DOCUMENT_LENGTH = 20;
+  
+  public static final int AVG_SENTENCE_LENGTH = 8;
+  
+  public static final int AVG_WORD_LENGTH = 6;
+  
+  public static final int NUM_DOCS = 100;
+  
+  public static final String CHARSET = "abcdef";
+  
+  public static final String DELIM = " .,?;:!\t\n\r";
+  
+  public static final String ERRORSET = "`1234567890"
+                                        + "-=~@#$%^&*()_+[]{}'\"/<>|\\";
+  
+  private static Random random = new Random();
+  
+  private FileSystem fs;
+  
+  private static char getRandomDelimiter() {
+    return DELIM.charAt(random.nextInt(DELIM.length()));
+  }
+  
+  private static String getRandomDocument() {
+    int length = (AVG_DOCUMENT_LENGTH >> 1)
+                 + random.nextInt(AVG_DOCUMENT_LENGTH);
+    StringBuilder sb = new StringBuilder(length * AVG_SENTENCE_LENGTH
+                                         * AVG_WORD_LENGTH);
+    for (int i = 0; i < length; i++) {
+      sb.append(getRandomSentence());
+    }
+    return sb.toString();
+  }
+  
+  private static String getRandomSentence() {
+    int length = (AVG_SENTENCE_LENGTH >> 1)
+                 + random.nextInt(AVG_SENTENCE_LENGTH);
+    StringBuilder sb = new StringBuilder(length * AVG_WORD_LENGTH);
+    for (int i = 0; i < length; i++) {
+      sb.append(getRandomString()).append(' ');
+    }
+    sb.append(getRandomDelimiter());
+    return sb.toString();
+  }
+  
+  private static String getRandomString() {
+    int length = (AVG_WORD_LENGTH >> 1) + random.nextInt(AVG_WORD_LENGTH);
+    StringBuilder sb = new StringBuilder(length);
+    for (int i = 0; i < length; i++) {
+      sb.append(CHARSET.charAt(random.nextInt(CHARSET.length())));
+    }
+    if (random.nextInt(10) == 0) sb.append(ERRORSET.charAt(random
+        .nextInt(ERRORSET.length())));
+    return sb.toString();
+  }
+  
+  private static void rmr(String path) throws Exception {
+    File f = new File(path);
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        String[] contents = f.list();
+        for (String content : contents) {
+          rmr(f.toString() + File.separator + content);
+        }
+      }
+      f.delete();
+    }
+  }
+  
+  public void setUp() throws Exception {
+    rmr("output");
+    rmr("testdata");
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public void testCreateTermFrequencyVectors() throws IOException,
+                                              InterruptedException,
+                                              ClassNotFoundException,
+                                              URISyntaxException {
+    Configuration conf = new Configuration();
+    String pathString = "testdata/documents/docs.file";
+    Path path = new Path(pathString);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, Text.class);
+    
+    for (int i = 0; i < NUM_DOCS; i++) {
+      writer.append(new Text("Document::ID::" + i), new Text(
+          getRandomDocument()));
+    }
+    writer.close();
+    DictionaryVectorizer.createTermFrequencyVectors(pathString,
+      "output/wordcount", new StandardAnalyzer(), 2, 100);
+    
+    
+  } 
+}