You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2012/05/13 23:55:27 UTC

svn commit: r1337989 - in /mahout/trunk/integration/src/main/java/org/apache/mahout/text: ./ wikipedia/

Author: robinanil
Date: Sun May 13 21:55:26 2012
New Revision: 1337989

URL: http://svn.apache.org/viewvc?rev=1337989&view=rev
Log:
MAHOUT-1010 bringing back deleted wikipedia dataset creator and moving it to integration

Added:
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java Sun May 13 21:55:26 2012
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.text.wikipedia.WikipediaMapper;
+import org.apache.mahout.text.wikipedia.XmlInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaToSequenceFile {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class);
+  
+  private WikipediaToSequenceFile() { }
+  
+  /**
+   * Takes in two arguments:
+   * <ol>
+   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+   * {@link org.apache.hadoop.io.SequenceFile}</li>
+   * </ol>
+   */
+  public static void main(String[] args) throws IOException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option categoriesOpt = obuilder.withLongName("categories").withArgument(
+      abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Location of the categories file.  One entry per line. "
+          + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+    
+    Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+      "If set, then the category name must exactly match the "
+          + "entry in the categories file. Default is false").withShortName("e").create();
+    
+    Option allOpt = obuilder.withLongName("all")
+        .withDescription("If set, Select all files. Default is false").withShortName("all").create();
+    
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    
+    Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+        .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt)
+        .create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    parser.setHelpOption(helpOpt);
+    try {
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+      String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+      
+      String catFile = "";
+      if (cmdLine.hasOption(categoriesOpt)) {
+        catFile = (String) cmdLine.getValue(categoriesOpt);
+      }
+      
+      boolean all = false;
+      if (cmdLine.hasOption(allOpt)) {
+        all = true;
+      }
+      runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all);
+    } catch (OptionException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    } catch (InterruptedException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    } catch (ClassNotFoundException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+  
+  /**
+   * Run the job
+   * 
+   * @param input
+   *          the input pathname String
+   * @param output
+   *          the output pathname String
+   * @param catFile
+   *          the file containing the Wikipedia categories
+   * @param exactMatchOnly
+   *          if true, then the Wikipedia category must match exactly instead of simply containing the
+   *          category string
+   * @param all
+   *          if true select all categories
+   */
+  public static void runJob(String input,
+                            String output,
+                            String catFile,
+                            boolean exactMatchOnly,
+                            boolean all) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
+    conf.set("xmlinput.start", "<page>");
+    conf.set("xmlinput.end", "</page>");
+    conf.setBoolean("exact.match.only", exactMatchOnly);
+    conf.setBoolean("all.files", all);
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    Job job = new Job(conf);
+    if (log.isInfoEnabled()) {
+      log.info("Input: {} Out: {} Categories: {} All Files: {}", new Object[] {input, output, catFile, all});
+    }
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    FileInputFormat.setInputPaths(job, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(job, outPath);
+    job.setMapperClass(WikipediaMapper.class);
+    job.setInputFormatClass(XmlInputFormat.class);
+    job.setReducerClass(Reducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setJarByClass(WikipediaToSequenceFile.class);
+    
+    /*
+     * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type",
+     * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type",
+     * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+     */
+    HadoopUtil.delete(conf, outPath);
+    
+    Set<String> categories = new HashSet<String>();
+    if (!catFile.isEmpty()) {
+      for (String line : new FileLineIterable(new File(catFile))) {
+        categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+      }
+    }
+    
+    DefaultStringifier<Set<String>> setStringifier =
+        new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(categories));
+    
+    String categoriesStr = setStringifier.toString(categories);
+    
+    conf.set("wikipedia.categories", categoriesStr);
+    
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java Sun May 13 21:55:26 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.analysis.WikipediaAnalyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaDatasetCreatorDriver {
+  private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class);
+  
+  private WikipediaDatasetCreatorDriver() { }
+  
+  /**
+   * Takes in two arguments:
+   * <ol>
+   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+   * {@link org.apache.hadoop.io.SequenceFile}</li>
+   * </ol>
+   */
+  public static void main(String[] args) throws IOException, InterruptedException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument(
+      abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Location of the categories file.  One entry per line. "
+          + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+    
+    Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+      "If set, then the category name must exactly match the "
+          + "entry in the categories file. Default is false").withShortName("e").create();
+    Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument(
+      abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The analyzer to use, must have a no argument constructor").withShortName("a").create();
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    
+    Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+        .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt)
+        .create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    try {
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+      String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+      String catFile = (String) cmdLine.getValue(categoriesOpt);
+      Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class;
+      if (cmdLine.hasOption(analyzerOpt)) {
+        String className = cmdLine.getValue(analyzerOpt).toString();
+        analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+        // try instantiating it, b/c there isn't any point in setting it if
+        // you can't instantiate it
+        ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+      }
+      runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt),
+        analyzerClass);
+    } catch (OptionException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    } catch (ClassNotFoundException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+  
+  /**
+   * Run the job
+   * 
+   * @param input
+   *          the input pathname String
+   * @param output
+   *          the output pathname String
+   * @param catFile
+   *          the file containing the Wikipedia categories
+   * @param exactMatchOnly
+   *          if true, then the Wikipedia category must match exactly instead of simply containing the
+   *          category string
+   */
+  public static void runJob(String input,
+                            String output,
+                            String catFile,
+                            boolean exactMatchOnly,
+                            Class<? extends Analyzer> analyzerClass)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
+    conf.set("key.value.separator.in.input.line", " ");
+    conf.set("xmlinput.start", "<page>");
+    conf.set("xmlinput.end", "</page>");
+    conf.setBoolean("exact.match.only", exactMatchOnly);
+    conf.set("analyzer.class", analyzerClass.getName());
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // Dont ever forget this. People should keep track of how hadoop conf
+    // parameters can make or break a piece of code
+    
+    Set<String> categories = new HashSet<String>();
+    for (String line : new FileLineIterable(new File(catFile))) {
+      categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+    }
+    
+    DefaultStringifier<Set<String>> setStringifier =
+        new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(categories));
+    
+    String categoriesStr = setStringifier.toString(categories);
+    
+    conf.set("wikipedia.categories", categoriesStr);
+    
+    Job job = new Job(conf);
+    if (log.isInfoEnabled()) {
+      log.info("Input: {} Out: {} Categories: {}", new Object[] {input, output, catFile});
+    }
+    job.setJarByClass(WikipediaDatasetCreatorDriver.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapperClass(WikipediaDatasetCreatorMapper.class);
+    //TODO: job.setNumMapTasks(100);
+    job.setInputFormatClass(XmlInputFormat.class);
+    job.setReducerClass(WikipediaDatasetCreatorReducer.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(job, outPath);
+    HadoopUtil.delete(conf, outPath);
+    
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java Sun May 13 21:55:26 2012
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.analysis.WikipediaAnalyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ * 
+ */
+public class WikipediaDatasetCreatorMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorMapper.class);
+
+  private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s\\W]");
+  private static final Pattern OPEN_TEXT_TAG_PATTERN = Pattern.compile("<text xml:space=\"preserve\">");
+  private static final Pattern CLOSE_TEXT_TAG_PATTERN = Pattern.compile("</text>");
+
+  private List<String> inputCategories;
+  private List<Pattern> inputCategoryPatterns;
+  private boolean exactMatchOnly;
+  private Analyzer analyzer;
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+    String document = value.toString();
+    document = StringEscapeUtils.unescapeHtml(CLOSE_TEXT_TAG_PATTERN.matcher(
+        OPEN_TEXT_TAG_PATTERN.matcher(document).replaceFirst("")).replaceAll(""));
+    String catMatch = findMatchingCategory(document);
+    if (!"Unknown".equals(catMatch)) {
+      StringBuilder contents = new StringBuilder(1000);
+      TokenStream stream = analyzer.reusableTokenStream(catMatch, new StringReader(document));
+      CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
+      stream.reset();
+      while (stream.incrementToken()) {
+        contents.append(termAtt.buffer(), 0, termAtt.length()).append(' ');
+      }
+      context.write(
+          new Text(SPACE_NON_ALPHA_PATTERN.matcher(catMatch).replaceAll("_")),
+          new Text(contents.toString()));
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    if (inputCategories == null) {
+      Set<String> newCategories = new HashSet<String>();
+      DefaultStringifier<Set<String>> setStringifier =
+          new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(newCategories));
+      String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+      Set<String> inputCategoriesSet = setStringifier.fromString(categoriesStr);
+      inputCategories = Lists.newArrayList(inputCategoriesSet);
+      inputCategoryPatterns = Lists.newArrayListWithCapacity(inputCategories.size());
+      for (String inputCategory : inputCategories) {
+        inputCategoryPatterns.add(Pattern.compile(".*\\b" + inputCategory + "\\b.*"));
+      }
+
+    }
+
+    exactMatchOnly = conf.getBoolean("exact.match.only", false);
+
+    if (analyzer == null) {
+      String analyzerStr = conf.get("analyzer.class", WikipediaAnalyzer.class.getName());
+      analyzer = ClassUtils.instantiateAs(analyzerStr, Analyzer.class);
+    }
+
+    log.info("Configure: Input Categories size: {} Exact Match: {} Analyzer: {}",
+             new Object[] {inputCategories.size(), exactMatchOnly, analyzer.getClass().getName()});
+  }
+
+  private String findMatchingCategory(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+      categoryIndex += 11;
+      int endIndex = document.indexOf("]]", categoryIndex);
+      if (endIndex >= document.length() || endIndex < 0) {
+        break;
+      }
+      String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+      // categories.add(category.toLowerCase());
+      if (exactMatchOnly && inputCategories.contains(category)) {
+        return category;
+      }
+      if (!exactMatchOnly) {
+        for (int i = 0; i < inputCategories.size(); i++) {
+          String inputCategory = inputCategories.get(i);
+          Pattern inputCategoryPattern = inputCategoryPatterns.get(i);
+          if (inputCategoryPattern.matcher(category).matches()) { // inexact match with word boundary.
+            return inputCategory;
+          }
+        }
+      }
+      startIndex = endIndex;
+    }
+    return "Unknown";
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java Sun May 13 21:55:26 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Can also be used as a local Combiner
+ */
+public class WikipediaDatasetCreatorReducer extends Reducer<Text, Text, Text, Text> {
+
+  @Override
+  protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    // Key is label,word, value is the number of times we've seen this label
+    // word per local node. Output is the same
+    for (Text value : values) {
+      context.write(key, value);
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java Sun May 13 21:55:26 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ * 
+ */
+public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaMapper.class);
+
+  private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s]");
+
+  private static final String START_DOC = "<text xml:space=\"preserve\">";
+
+  private static final String END_DOC = "</text>";
+
+  private static final Pattern TITLE = Pattern.compile("<title>(.*)<\\/title>");
+
+  private static final String REDIRECT = "<redirect />";
+
+  private Set<String> inputCategories;
+
+  private boolean exactMatchOnly;
+
+  private boolean all;
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+
+    String content = value.toString();
+    if (content.contains(REDIRECT)) {
+      return;
+    }
+    String document;
+    String title;
+    try {
+      document = getDocument(content);
+      title = getTitle(content);
+    } catch (RuntimeException e) {
+      // TODO: reporter.getCounter("Wikipedia", "Parse errors").increment(1);
+      return;
+    }
+
+    if (!all) {
+      String catMatch = findMatchingCategory(document);
+      if ("Unknown".equals(catMatch)) {
+        return;
+      }
+    }
+    document = StringEscapeUtils.unescapeHtml(document);
+    context.write(new Text(SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll("_")), new Text(document));
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    if (inputCategories == null) {
+      Set<String> newCategories = new HashSet<String>();
+
+      DefaultStringifier<Set<String>> setStringifier =
+          new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(newCategories));
+
+      String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+      inputCategories = setStringifier.fromString(categoriesStr);
+    }
+    exactMatchOnly = conf.getBoolean("exact.match.only", false);
+    all = conf.getBoolean("all.files", true);
+    log.info("Configure: Input Categories size: {} All: {} Exact Match: {}",
+             new Object[] {inputCategories.size(), all, exactMatchOnly});
+  }
+
+  private static String getDocument(String xml) {
+    int start = xml.indexOf(START_DOC) + START_DOC.length();
+    int end = xml.indexOf(END_DOC, start);
+    return xml.substring(start, end);
+  }
+
+  private static String getTitle(CharSequence xml) {
+    Matcher m = TITLE.matcher(xml);
+    return m.find() ? m.group(1) : "";
+  }
+
+  private String findMatchingCategory(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+      categoryIndex += 11;
+      int endIndex = document.indexOf("]]", categoryIndex);
+      if (endIndex >= document.length() || endIndex < 0) {
+        break;
+      }
+      String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+      if (exactMatchOnly && inputCategories.contains(category)) {
+        return category;
+      }
+      if (!exactMatchOnly) {
+        for (String inputCategory : inputCategories) {
+          if (category.contains(inputCategory)) { // we have an inexact match
+            return inputCategory;
+          }
+        }
+      }
+      startIndex = endIndex;
+    }
+    return "Unknown";
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java Sun May 13 21:55:26 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>The Bayes example package provides some helper classes for training the Naive Bayes classifier
+ * on the Twenty Newsgroups data. See {@link org.apache.mahout.examples.wikipedia.PrepareTwentyNewsgroups}
+ * for details on running the trainer and
+ * formatting the Twenty Newsgroups data properly for the training.</p>
+ *
+ * <p>The easiest way to prepare the data is to use the ant task in core/build.xml:</p>
+ *
+ * <p>{@code ant extract-20news-18828}</p>
+ *
+ * <p>This runs the arg line:</p>
+ *
+ * <p>{@code -p $\{working.dir\}/20news-18828/ -o $\{working.dir\}/20news-18828-collapse -a $\{analyzer\} -c UTF-8}</p>
+ *
+ * <p>To Run the Wikipedia examples (assumes you've built the Mahout Job jar):</p>
+ *
+ * <ol>
+ *  <li>Download the Wikipedia Dataset. Use the Ant target: {@code ant enwiki-files}</li>
+ *  <li>Chunk the data using the WikipediaXmlSplitter (from the Hadoop home):
+ *   {@code bin/hadoop jar $MAHOUT_HOME/target/mahout-examples-0.x
+ *   org.apache.mahout.classifier.bayes.WikipediaXmlSplitter
+ *   -d $MAHOUT_HOME/examples/temp/enwiki-latest-pages-articles.xml
+ *   -o $MAHOUT_HOME/examples/work/wikipedia/chunks/ -c 64}</li>
+ * </ol>
+ */
+public final class WikipediaXmlSplitter {
+  
+  private static final Logger log = LoggerFactory.getLogger(WikipediaXmlSplitter.class);
+  
+  private WikipediaXmlSplitter() { }
+  
+  public static void main(String[] args) throws IOException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dumpFileOpt = obuilder.withLongName("dumpFile").withRequired(true).withArgument(
+      abuilder.withName("dumpFile").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The path to the wikipedia dump file (.bz2 or uncompressed)").withShortName("d").create();
+    
+    Option outputDirOpt = obuilder.withLongName("outputDir").withRequired(true).withArgument(
+      abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The output directory to place the splits in:\n"
+          + "local files:\n\t/var/data/wikipedia-xml-chunks or\n\tfile:///var/data/wikipedia-xml-chunks\n"
+          + "Hadoop DFS:\n\thdfs://wikipedia-xml-chunks\n"
+          + "AWS S3 (blocks):\n\ts3://bucket-name/wikipedia-xml-chunks\n"
+          + "AWS S3 (native files):\n\ts3n://bucket-name/wikipedia-xml-chunks\n")
+
+    .withShortName("o").create();
+    
+    Option s3IdOpt = obuilder.withLongName("s3ID").withRequired(false).withArgument(
+      abuilder.withName("s3Id").withMinimum(1).withMaximum(1).create()).withDescription("Amazon S3 ID key")
+        .withShortName("i").create();
+    Option s3SecretOpt = obuilder.withLongName("s3Secret").withRequired(false).withArgument(
+      abuilder.withName("s3Secret").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Amazon S3 secret key").withShortName("s").create();
+    
+    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withRequired(true).withArgument(
+      abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The Size of the chunk, in megabytes").withShortName("c").create();
+    Option numChunksOpt = obuilder
+        .withLongName("numChunks")
+        .withRequired(false)
+        .withArgument(abuilder.withName("numChunks").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+          "The maximum number of chunks to create.  If specified, program will only create a subset of the chunks")
+        .withShortName("n").create();
+    Group group = gbuilder.withName("Options").withOption(dumpFileOpt).withOption(outputDirOpt).withOption(
+      chunkSizeOpt).withOption(numChunksOpt).withOption(s3IdOpt).withOption(s3SecretOpt).create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    CommandLine cmdLine;
+    try {
+      cmdLine = parser.parse(args);
+    } catch (OptionException e) {
+      log.error("Error while parsing options", e);
+      CommandLineUtil.printHelp(group);
+      return;
+    }
+    
+    Configuration conf = new Configuration();
+    String dumpFilePath = (String) cmdLine.getValue(dumpFileOpt);
+    String outputDirPath = (String) cmdLine.getValue(outputDirOpt);
+    
+    if (cmdLine.hasOption(s3IdOpt)) {
+      String id = (String) cmdLine.getValue(s3IdOpt);
+      conf.set("fs.s3n.awsAccessKeyId", id);
+      conf.set("fs.s3.awsAccessKeyId", id);
+    }
+    if (cmdLine.hasOption(s3SecretOpt)) {
+      String secret = (String) cmdLine.getValue(s3SecretOpt);
+      conf.set("fs.s3n.awsSecretAccessKey", secret);
+      conf.set("fs.s3.awsSecretAccessKey", secret);
+    }
+    // do not compute crc file when using local FS
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+    FileSystem fs = FileSystem.get(URI.create(outputDirPath), conf);
+    
+    int chunkSize = 1024 * 1024 * Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
+    
+    int numChunks = Integer.MAX_VALUE;
+    if (cmdLine.hasOption(numChunksOpt)) {
+      numChunks = Integer.parseInt((String) cmdLine.getValue(numChunksOpt));
+    }
+    
+    String header = "<mediawiki xmlns=\"http://www.mediawiki.org/xml/export-0.3/\" "
+                    + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" "
+                    + "xsi:schemaLocation=\"http://www.mediawiki.org/xml/export-0.3/ "
+                    + "http://www.mediawiki.org/xml/export-0.3.xsd\" " + "version=\"0.3\" "
+                    + "xml:lang=\"en\">\n" + "  <siteinfo>\n" + "<sitename>Wikipedia</sitename>\n"
+                    + "    <base>http://en.wikipedia.org/wiki/Main_Page</base>\n"
+                    + "    <generator>MediaWiki 1.13alpha</generator>\n" + "    <case>first-letter</case>\n"
+                    + "    <namespaces>\n" + "      <namespace key=\"-2\">Media</namespace>\n"
+                    + "      <namespace key=\"-1\">Special</namespace>\n" + "      <namespace key=\"0\" />\n"
+                    + "      <namespace key=\"1\">Talk</namespace>\n"
+                    + "      <namespace key=\"2\">User</namespace>\n"
+                    + "      <namespace key=\"3\">User talk</namespace>\n"
+                    + "      <namespace key=\"4\">Wikipedia</namespace>\n"
+                    + "      <namespace key=\"5\">Wikipedia talk</namespace>\n"
+                    + "      <namespace key=\"6\">Image</namespace>\n"
+                    + "      <namespace key=\"7\">Image talk</namespace>\n"
+                    + "      <namespace key=\"8\">MediaWiki</namespace>\n"
+                    + "      <namespace key=\"9\">MediaWiki talk</namespace>\n"
+                    + "      <namespace key=\"10\">Template</namespace>\n"
+                    + "      <namespace key=\"11\">Template talk</namespace>\n"
+                    + "      <namespace key=\"12\">Help</namespace>\n"
+                    + "      <namespace key=\"13\">Help talk</namespace>\n"
+                    + "      <namespace key=\"14\">Category</namespace>\n"
+                    + "      <namespace key=\"15\">Category talk</namespace>\n"
+                    + "      <namespace key=\"100\">Portal</namespace>\n"
+                    + "      <namespace key=\"101\">Portal talk</namespace>\n" + "    </namespaces>\n"
+                    + "  </siteinfo>\n";
+    
+    StringBuilder content = new StringBuilder();
+    content.append(header);
+    NumberFormat decimalFormatter = new DecimalFormat("0000");
+    File dumpFile = new File(dumpFilePath);
+    FileLineIterator it;
+    if (dumpFilePath.endsWith(".bz2")) {
+      // default compression format from http://download.wikimedia.org
+      CompressionCodec codec = new BZip2Codec();
+      it = new FileLineIterator(codec.createInputStream(new FileInputStream(dumpFile)));
+    } else {
+      // assume the user has previously de-compressed the dump file
+      it = new FileLineIterator(dumpFile);
+    }
+    int filenumber = 0;
+    while (it.hasNext()) {
+      String thisLine = it.next();
+      if (thisLine.trim().startsWith("<page>")) {
+        boolean end = false;
+        while (!thisLine.trim().startsWith("</page>")) {
+          content.append(thisLine).append('\n');
+          if (it.hasNext()) {
+            thisLine = it.next();
+          } else {
+            end = true;
+            break;
+          }
+        }
+        content.append(thisLine).append('\n');
+        
+        if (content.length() > chunkSize || end) {
+          content.append("</mediawiki>");
+          filenumber++;
+          String filename = outputDirPath + "/chunk-" + decimalFormatter.format(filenumber) + ".xml";
+          BufferedWriter chunkWriter =
+              new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filename)), "UTF-8"));
+          try {
+            chunkWriter.write(content.toString(), 0, content.length());
+          } finally {
+            Closeables.closeQuietly(chunkWriter);
+          }
+          if (filenumber >= numChunks) {
+            break;
+          }
+          content = new StringBuilder();
+          content.append(header);
+        }
+      }
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java Sun May 13 21:55:26 2012
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlInputFormat extends TextInputFormat {
+
+  private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);
+
+  public static final String START_TAG_KEY = "xmlinput.start";
+  public static final String END_TAG_KEY = "xmlinput.end";
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+    try {
+      return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+    } catch (IOException ioe) {
+      log.warn("Error while creating XmlRecordReader", ioe);
+      return null;
+    }
+  }
+
+  /**
+   * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+   * by the start tag and end tag
+   * 
+   */
+  public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+    private final byte[] startTag;
+    private final byte[] endTag;
+    private final long start;
+    private final long end;
+    private final FSDataInputStream fsin;
+    private final DataOutputBuffer buffer = new DataOutputBuffer();
+    private LongWritable currentKey;
+    private Text currentValue;
+
+    public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+      startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
+      endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);
+
+      // open the file and seek to the start of the split
+      start = split.getStart();
+      end = start + split.getLength();
+      Path file = split.getPath();
+      FileSystem fs = file.getFileSystem(conf);
+      fsin = fs.open(split.getPath());
+      fsin.seek(start);
+    }
+
+    private boolean next(LongWritable key, Text value) throws IOException {
+      if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
+        try {
+          buffer.write(startTag);
+          if (readUntilMatch(endTag, true)) {
+            key.set(fsin.getPos());
+            value.set(buffer.getData(), 0, buffer.getLength());
+            return true;
+          }
+        } finally {
+          buffer.reset();
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      Closeables.closeQuietly(fsin);
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return (fsin.getPos() - start) / (float) (end - start);
+    }
+
+    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+      int i = 0;
+      while (true) {
+        int b = fsin.read();
+        // end of file:
+        if (b == -1) {
+          return false;
+        }
+        // save to buffer:
+        if (withinBlock) {
+          buffer.write(b);
+        }
+
+        // check if we're matching:
+        if (b == match[i]) {
+          i++;
+          if (i >= match.length) {
+            return true;
+          }
+        } else {
+          i = 0;
+        }
+        // see if we've passed the stop point:
+        if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+          return false;
+        }
+      }
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+      return currentKey;
+    }
+
+    @Override
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return currentValue;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      currentKey = new LongWritable();
+      currentValue = new Text();
+      return next(currentKey, currentValue);
+    }
+  }
+}