You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:52:13 UTC

[45/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045 Delete directories which were moved/no longer in use

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
new file mode 100644
index 0000000..203e8fb
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION;
+
+/**
+ * Map Class for the SequenceFilesFromMailArchives job
+ */
+public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+  private Text outKey = new Text();
+  private Text outValue = new Text();
+
+  private static final Pattern MESSAGE_START = Pattern.compile(
+    "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+  private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
+    "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+
+  private MailOptions options;
+
+  @Override
+  public void setup(Context context) throws IOException, InterruptedException {
+
+    Configuration configuration = context.getConfiguration();
+
+    // absorb all of the options into the MailOptions object
+    this.options = new MailOptions();
+
+    options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], ""));
+
+    if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) {
+      options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64));
+    }
+
+    if (!configuration.get(CHARSET_OPTION[0], "").equals("")) {
+      Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8"));
+      options.setCharset(charset);
+    } else {
+      Charset charset = Charset.forName("UTF-8");
+      options.setCharset(charset);
+    }
+
+    List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
+    // patternOrder is used downstream so that we can know what order the
+    // text is in instead
+    // of encoding it in the string, which
+    // would require more processing later to remove it pre feature
+    // selection.
+    Map<String, Integer> patternOrder = Maps.newHashMap();
+    int order = 0;
+    if (!configuration.get(FROM_OPTION[1], "").equals("")) {
+      patterns.add(MailProcessor.FROM_PREFIX);
+      patternOrder.put(MailOptions.FROM, order++);
+    }
+
+    if (!configuration.get(TO_OPTION[1], "").equals("")) {
+      patterns.add(MailProcessor.TO_PREFIX);
+      patternOrder.put(MailOptions.TO, order++);
+    }
+
+    if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) {
+      patterns.add(MailProcessor.REFS_PREFIX);
+      patternOrder.put(MailOptions.REFS, order++);
+    }
+
+    if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) {
+      patterns.add(MailProcessor.SUBJECT_PREFIX);
+      patternOrder.put(MailOptions.SUBJECT, order += 1);
+    }
+
+    options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false));
+
+    options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+    options.setPatternOrder(patternOrder);
+
+    options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false));
+
+    options.setSeparator("\n");
+    if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) {
+      options.setSeparator(configuration.get(SEPARATOR_OPTION[1], ""));
+    }
+    if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) {
+      options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], ""));
+    }
+    if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) {
+      options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], "")));
+    }
+
+  }
+
+  public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context)
+    throws IOException, InterruptedException {
+    long messageCount = 0;
+    try {
+      StringBuilder contents = new StringBuilder();
+      StringBuilder body = new StringBuilder();
+      Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
+      Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
+      String[] patternResults = new String[options.getPatternsToMatch().length];
+      Matcher[] matches = new Matcher[options.getPatternsToMatch().length];
+      for (int i = 0; i < matches.length; i++) {
+        matches[i] = options.getPatternsToMatch()[i].matcher("");
+      }
+
+      String messageId = null;
+      boolean inBody = false;
+      Pattern quotedTextPattern = options.getQuotedTextPattern();
+
+      for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) {
+        if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
+          for (int i = 0; i < matches.length; i++) {
+            Matcher matcher = matches[i];
+            matcher.reset(nextLine);
+            if (matcher.matches()) {
+              patternResults[i] = matcher.group(1);
+            }
+          }
+
+          // only start appending body content after we've seen a message ID
+          if (messageId != null) {
+            // first, see if we hit the end of the message
+            messageBoundaryMatcher.reset(nextLine);
+            if (messageBoundaryMatcher.matches()) {
+              // done parsing this message ... write it out
+              String key = generateKey(filename, options.getPrefix(), messageId);
+              // if this ordering changes, then also change
+              // FromEmailToDictionaryMapper
+              writeContent(options.getSeparator(), contents, body, patternResults);
+
+              this.outKey.set(key);
+              this.outValue.set(contents.toString());
+              context.write(this.outKey, this.outValue);
+              contents.setLength(0); // reset the buffer
+              body.setLength(0);
+              messageId = null;
+              inBody = false;
+            } else {
+              if (inBody && options.isIncludeBody()) {
+                if (!nextLine.isEmpty()) {
+                  body.append(nextLine).append(options.getBodySeparator());
+                }
+              } else {
+                // first empty line we see after reading the message Id
+                // indicates that we are in the body ...
+                inBody = nextLine.isEmpty();
+              }
+            }
+          } else {
+            if (nextLine.length() > 14) {
+              messageIdMatcher.reset(nextLine);
+              if (messageIdMatcher.matches()) {
+                messageId = messageIdMatcher.group(1);
+                ++messageCount;
+              }
+            }
+          }
+        }
+      }
+      // write the last message in the file if available
+      if (messageId != null) {
+        String key = generateKey(filename, options.getPrefix(), messageId);
+        writeContent(options.getSeparator(), contents, body, patternResults);
+        this.outKey.set(key);
+        this.outValue.set(contents.toString());
+        context.write(this.outKey, this.outValue);
+        contents.setLength(0); // reset the buffer
+      }
+    } catch (FileNotFoundException ignored) {
+
+    }
+    return messageCount;
+  }
+
+  protected static String generateKey(String mboxFilename, String prefix, String messageId) {
+    return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator());
+  }
+
+  private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
+    String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator());
+    contents.append(matchesString).append(separator).append(body);
+  }
+
+  public void map(IntWritable key, BytesWritable value, Context context)
+    throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
+    ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
+    parseMailboxLineByLine(relativeFilePath, is, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
new file mode 100644
index 0000000..cacfd22
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+
+import java.io.IOException;
+
+public class TextParagraphSplittingJob extends AbstractJob {
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    Configuration originalConf = getConf();
+    Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")),
+                         new Path(originalConf.get("mapred.output.dir")),
+                         SequenceFileInputFormat.class,
+                         SplitMap.class,
+                         Text.class,
+                         Text.class,
+                         Reducer.class,
+                         Text.class,
+                         Text.class,
+                         SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(0);
+    boolean succeeded = job.waitForCompletion(true);
+    return succeeded ? 0 : -1;
+  }
+
+  public static class SplitMap extends Mapper<Text,Text,Text,Text> {
+
+    @Override
+    protected void map(Text key, Text text, Context context) throws IOException, InterruptedException {
+      Text outText = new Text();
+      int loc = 0;
+      while (loc >= 0 && loc < text.getLength()) {
+        int nextLoc = text.find("\n\n", loc + 1);
+        if (nextLoc > 0) {
+          outText.set(text.getBytes(), loc, nextLoc - loc);
+          context.write(key, outText);
+        }
+        loc = nextLoc;
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new TextParagraphSplittingJob(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
new file mode 100644
index 0000000..b8441b7
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
+
+/**
+ * RecordReader used with the MultipleTextFileInputFormat class to read full files as
+ * k/v pairs and groups of files as single input splits.
+ */
+public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
+
+  private FileSplit fileSplit;
+  private boolean processed = false;
+  private Configuration configuration;
+  private BytesWritable value = new BytesWritable();
+  private IntWritable index;
+  private String fileFilterClassName = null;
+  private PathFilter pathFilter = null;
+
+  public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
+      throws IOException {
+    this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
+        fileSplit.getLength(idx), fileSplit.getLocations());
+    this.configuration = taskAttemptContext.getConfiguration();
+    this.index = new IntWritable(idx);
+    this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
+  }
+
+  @Override
+  public IntWritable getCurrentKey() {
+    return index;
+  }
+
+  @Override
+  public BytesWritable getCurrentValue() {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return processed ? 1.0f : 0.0f;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    if (!StringUtils.isBlank(fileFilterClassName) &&
+        !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+      try {
+        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (!processed) {
+      byte[] contents = new byte[(int) fileSplit.getLength()];
+      Path file = fileSplit.getPath();
+      FileSystem fs = file.getFileSystem(this.configuration);
+
+      if (!fs.isFile(file)) {
+        return false;
+      }
+
+      FileStatus[] fileStatuses;
+      if (pathFilter != null) {
+        fileStatuses = fs.listStatus(file, pathFilter);
+      } else {
+        fileStatuses = fs.listStatus(file);
+      }
+
+      if (fileStatuses.length == 1) {
+        try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) {
+          IOUtils.readFully(in, contents, 0, contents.length);
+          value.setCapacity(contents.length);
+          value.set(contents, 0, contents.length);
+        }
+        processed = true;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
new file mode 100644
index 0000000..bed4640
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.text.wikipedia.WikipediaMapper;
+import org.apache.mahout.text.wikipedia.XmlInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaToSequenceFile {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class);
+  
+  private WikipediaToSequenceFile() { }
+  
+  /**
+   * Takes in two arguments:
+   * <ol>
+   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+   * {@link org.apache.hadoop.io.SequenceFile}</li>
+   * </ol>
+   */
+  public static void main(String[] args)  throws IOException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option categoriesOpt = obuilder.withLongName("categories").withArgument(
+      abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Location of the categories file.  One entry per line. "
+          + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+    
+    Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+      "If set, then the category name must exactly match the "
+          + "entry in the categories file. Default is false").withShortName("e").create();
+    
+    Option allOpt = obuilder.withLongName("all")
+        .withDescription("If set, Select all files. Default is false").withShortName("all").create();
+
+    Option removeLabelOpt = obuilder.withLongName("removeLabels")
+        .withDescription("If set, remove [[Category:labels]] from document text after extracting label."
+          + "Default is false").withShortName("rl").create();
+
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    
+    Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+        .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt)
+        .withOption(removeLabelOpt).create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    parser.setHelpOption(helpOpt);
+    try {
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+      String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+      
+      String catFile = "";
+      if (cmdLine.hasOption(categoriesOpt)) {
+        catFile = (String) cmdLine.getValue(categoriesOpt);
+      }
+      
+      boolean all = false;
+      if (cmdLine.hasOption(allOpt)) {
+        all = true;
+      }
+
+      boolean removeLabels = false;
+      if (cmdLine.hasOption(removeLabelOpt)) {
+          removeLabels = true;
+      }
+
+      runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all, removeLabels);
+    } catch (OptionException | InterruptedException | ClassNotFoundException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+  
+  /**
+   * Run the job
+   * 
+   * @param input
+   *          the input pathname String
+   * @param output
+   *          the output pathname String
+   * @param catFile
+   *          the file containing the Wikipedia categories
+   * @param exactMatchOnly
+   *          if true, then the Wikipedia category must match exactly instead of simply containing the
+   *          category string
+   * @param all
+   *          if true select all categories
+   * @param removeLabels
+   *          if true remove Category labels from document text after extracting.
+   *
+   */
+  public static void runJob(String input,
+                            String output,
+                            String catFile,
+                            boolean exactMatchOnly,
+                            boolean all,
+                            boolean removeLabels) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
+    conf.set("xmlinput.start", "<page>");
+    conf.set("xmlinput.end", "</page>");
+    conf.setBoolean("exact.match.only", exactMatchOnly);
+    conf.setBoolean("all.files", all);
+    conf.setBoolean("remove.labels", removeLabels);
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    Set<String> categories = new HashSet<>();
+    if (!catFile.isEmpty()) {
+      for (String line : new FileLineIterable(new File(catFile))) {
+        categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+      }
+    }
+    
+    Stringifier<Set<String>> setStringifier =
+        new DefaultStringifier<>(conf, GenericsUtil.getClass(categories));
+    
+    String categoriesStr = setStringifier.toString(categories);    
+    conf.set("wikipedia.categories", categoriesStr);
+    
+    Job job = new Job(conf);
+    log.info("Input: {} Out: {} Categories: {} All Files: {}", input, output, catFile, all);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    FileInputFormat.setInputPaths(job, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(job, outPath);
+    job.setMapperClass(WikipediaMapper.class);
+    job.setInputFormatClass(XmlInputFormat.class);
+    job.setReducerClass(Reducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setJarByClass(WikipediaToSequenceFile.class);
+    
+    /*
+     * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type",
+     * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type",
+     * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+     */
+    HadoopUtil.delete(conf, outPath);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
new file mode 100644
index 0000000..d50323d
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.LowerCaseFilter;
+import org.apache.lucene.analysis.core.StopAnalyzer;
+import org.apache.lucene.analysis.core.StopFilter;
+import org.apache.lucene.analysis.standard.StandardFilter;
+import org.apache.lucene.analysis.util.CharArraySet;
+import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
+import org.apache.lucene.analysis.wikipedia.WikipediaTokenizer;
+
+
+public class WikipediaAnalyzer extends StopwordAnalyzerBase {
+  
+  public WikipediaAnalyzer() {
+    super(StopAnalyzer.ENGLISH_STOP_WORDS_SET);
+  }
+  
+  public WikipediaAnalyzer(CharArraySet stopSet) {
+    super(stopSet);
+  }
+
+  @Override
+  protected TokenStreamComponents createComponents(String fieldName) {
+    Tokenizer tokenizer = new WikipediaTokenizer();
+    TokenStream result = new StandardFilter(tokenizer);
+    result = new LowerCaseFilter(result);
+    result = new StopFilter(result, getStopwordSet());
+    return new TokenStreamComponents(tokenizer, result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
new file mode 100644
index 0000000..8214407
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaDatasetCreatorDriver {
+  private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class);
+  
+  private WikipediaDatasetCreatorDriver() { }
+  
+  /**
+   * Takes in two arguments:
+   * <ol>
+   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+   * {@link org.apache.hadoop.io.SequenceFile}</li>
+   * </ol>
+   */
+  public static void main(String[] args) throws IOException, InterruptedException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument(
+      abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Location of the categories file.  One entry per line. "
+          + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+    
+    Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+      "If set, then the category name must exactly match the "
+          + "entry in the categories file. Default is false").withShortName("e").create();
+    Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument(
+      abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The analyzer to use, must have a no argument constructor").withShortName("a").create();
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    
+    Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+        .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt)
+        .create();
+    
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    try {
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+      String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+      String catFile = (String) cmdLine.getValue(categoriesOpt);
+      Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class;
+      if (cmdLine.hasOption(analyzerOpt)) {
+        String className = cmdLine.getValue(analyzerOpt).toString();
+        analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+        // try instantiating it, b/c there isn't any point in setting it if
+        // you can't instantiate it
+        ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+      }
+      runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt),
+        analyzerClass);
+    } catch (OptionException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    } catch (ClassNotFoundException e) {
+      log.error("Exception", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+  
+  /**
+   * Run the job
+   * 
+   * @param input
+   *          the input pathname String
+   * @param output
+   *          the output pathname String
+   * @param catFile
+   *          the file containing the Wikipedia categories
+   * @param exactMatchOnly
+   *          if true, then the Wikipedia category must match exactly instead of simply containing the
+   *          category string
+   */
+  public static void runJob(String input,
+                            String output,
+                            String catFile,
+                            boolean exactMatchOnly,
+                            Class<? extends Analyzer> analyzerClass)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
+    conf.set("key.value.separator.in.input.line", " ");
+    conf.set("xmlinput.start", "<page>");
+    conf.set("xmlinput.end", "</page>");
+    conf.setBoolean("exact.match.only", exactMatchOnly);
+    conf.set("analyzer.class", analyzerClass.getName());
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    // Dont ever forget this. People should keep track of how hadoop conf
+    // parameters can make or break a piece of code
+    
+    Set<String> categories = new HashSet<>();
+    for (String line : new FileLineIterable(new File(catFile))) {
+      categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+    }
+    
+    Stringifier<Set<String>> setStringifier =
+        new DefaultStringifier<>(conf, GenericsUtil.getClass(categories));
+    
+    String categoriesStr = setStringifier.toString(categories);
+    
+    conf.set("wikipedia.categories", categoriesStr);
+    
+    Job job = new Job(conf);
+    log.info("Input: {} Out: {} Categories: {}", input, output, catFile);
+    job.setJarByClass(WikipediaDatasetCreatorDriver.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapperClass(WikipediaDatasetCreatorMapper.class);
+    //TODO: job.setNumMapTasks(100);
+    job.setInputFormatClass(XmlInputFormat.class);
+    job.setReducerClass(WikipediaDatasetCreatorReducer.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(job, outPath);
+    HadoopUtil.delete(conf, outPath);
+    
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
new file mode 100644
index 0000000..50e5f37
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.ClassUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ * 
+ */
+public class WikipediaDatasetCreatorMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorMapper.class);
+
+  private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s\\W]");
+  private static final Pattern OPEN_TEXT_TAG_PATTERN = Pattern.compile("<text xml:space=\"preserve\">");
+  private static final Pattern CLOSE_TEXT_TAG_PATTERN = Pattern.compile("</text>");
+
+  private List<String> inputCategories;
+  private List<Pattern> inputCategoryPatterns;
+  private boolean exactMatchOnly;
+  private Analyzer analyzer;
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+    String document = value.toString();
+    document = StringEscapeUtils.unescapeHtml4(CLOSE_TEXT_TAG_PATTERN.matcher(
+        OPEN_TEXT_TAG_PATTERN.matcher(document).replaceFirst("")).replaceAll(""));
+    String catMatch = findMatchingCategory(document);
+    if (!"Unknown".equals(catMatch)) {
+      StringBuilder contents = new StringBuilder(1000);
+      TokenStream stream = analyzer.tokenStream(catMatch, new StringReader(document));
+      CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
+      stream.reset();
+      while (stream.incrementToken()) {
+        contents.append(termAtt.buffer(), 0, termAtt.length()).append(' ');
+      }
+      context.write(
+          new Text(SPACE_NON_ALPHA_PATTERN.matcher(catMatch).replaceAll("_")),
+          new Text(contents.toString()));
+      stream.end();
+      Closeables.close(stream, true);
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    if (inputCategories == null) {
+      Set<String> newCategories = new HashSet<>();
+      DefaultStringifier<Set<String>> setStringifier =
+          new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories));
+      String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+      Set<String> inputCategoriesSet = setStringifier.fromString(categoriesStr);
+      inputCategories = new ArrayList<>(inputCategoriesSet);
+      inputCategoryPatterns = new ArrayList<>(inputCategories.size());
+      for (String inputCategory : inputCategories) {
+        inputCategoryPatterns.add(Pattern.compile(".*\\b" + inputCategory + "\\b.*"));
+      }
+
+    }
+
+    exactMatchOnly = conf.getBoolean("exact.match.only", false);
+
+    if (analyzer == null) {
+      String analyzerStr = conf.get("analyzer.class", WikipediaAnalyzer.class.getName());
+      analyzer = ClassUtils.instantiateAs(analyzerStr, Analyzer.class);
+    }
+
+    log.info("Configure: Input Categories size: {} Exact Match: {} Analyzer: {}",
+             inputCategories.size(), exactMatchOnly, analyzer.getClass().getName());
+  }
+
+  private String findMatchingCategory(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+      categoryIndex += 11;
+      int endIndex = document.indexOf("]]", categoryIndex);
+      if (endIndex >= document.length() || endIndex < 0) {
+        break;
+      }
+      String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+      // categories.add(category.toLowerCase());
+      if (exactMatchOnly && inputCategories.contains(category)) {
+        return category;
+      }
+      if (!exactMatchOnly) {
+        for (int i = 0; i < inputCategories.size(); i++) {
+          String inputCategory = inputCategories.get(i);
+          Pattern inputCategoryPattern = inputCategoryPatterns.get(i);
+          if (inputCategoryPattern.matcher(category).matches()) { // inexact match with word boundary.
+            return inputCategory;
+          }
+        }
+      }
+      startIndex = endIndex;
+    }
+    return "Unknown";
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
new file mode 100644
index 0000000..bf921fc
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Can also be used as a local Combiner
+ */
+public class WikipediaDatasetCreatorReducer extends Reducer<Text, Text, Text, Text> {
+
+  @Override
+  protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    // Key is label,word, value is the number of times we've seen this label
+    // word per local node. Output is the same
+    for (Text value : values) {
+      context.write(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
new file mode 100644
index 0000000..abd3a04
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ * 
+ */
+public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaMapper.class);
+
+  private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s]");
+
+  private static final String START_DOC = "<text xml:space=\"preserve\">";
+
+  private static final String END_DOC = "</text>";
+
+  private static final Pattern TITLE = Pattern.compile("<title>(.*)<\\/title>");
+
+  private static final String REDIRECT = "<redirect />";
+
+  private Set<String> inputCategories;
+
+  private boolean exactMatchOnly;
+
+  private boolean all;
+
+  private boolean removeLabels;
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+
+    String content = value.toString();
+    if (content.contains(REDIRECT)) {
+      return;
+    }
+    String document;
+    String title;
+    try {
+      document = getDocument(content);
+      title = getTitle(content);
+    } catch (RuntimeException e) {
+      // TODO: reporter.getCounter("Wikipedia", "Parse errors").increment(1);
+      return;
+    }
+
+    String catMatch = findMatchingCategory(document);
+    if (!all) {
+      if ("Unknown".equals(catMatch)) {
+        return;
+      }
+    }
+
+    document = StringEscapeUtils.unescapeHtml4(document);    
+    if (removeLabels) {
+      document = removeCategoriesFromText(document);
+      // Reject documents with malformed tags
+      if (document == null) {
+        return;
+      }
+    }
+
+    // write out in Bayes input style: key: /Category/document_name
+    String category = "/" + catMatch.toLowerCase(Locale.ENGLISH) + "/" +
+        SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll("_");
+
+    context.write(new Text(category), new Text(document));
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+ 
+    Set<String> newCategories = new HashSet<>();
+    DefaultStringifier<Set<String>> setStringifier =
+          new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories));
+
+    String categoriesStr = conf.get("wikipedia.categories");
+    inputCategories = setStringifier.fromString(categoriesStr);
+    exactMatchOnly = conf.getBoolean("exact.match.only", false);
+    all = conf.getBoolean("all.files", false);
+    removeLabels = conf.getBoolean("remove.labels",false);
+    log.info("Configure: Input Categories size: {} All: {} Exact Match: {} Remove Labels from Text: {}",
+            inputCategories.size(), all, exactMatchOnly, removeLabels);
+  }
+
+  private static String getDocument(String xml) {
+    int start = xml.indexOf(START_DOC) + START_DOC.length();
+    int end = xml.indexOf(END_DOC, start);
+    return xml.substring(start, end);
+  }
+
+  private static String getTitle(CharSequence xml) {
+    Matcher m = TITLE.matcher(xml);
+    return m.find() ? m.group(1) : "";
+  }
+
+  private String findMatchingCategory(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+      categoryIndex += 11;
+      int endIndex = document.indexOf("]]", categoryIndex);
+      if (endIndex >= document.length() || endIndex < 0) {
+        break;
+      }
+      String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+      if (exactMatchOnly && inputCategories.contains(category)) {
+        return category.toLowerCase(Locale.ENGLISH);
+      }
+      if (!exactMatchOnly) {
+        for (String inputCategory : inputCategories) {
+          if (category.contains(inputCategory)) { // we have an inexact match
+            return inputCategory.toLowerCase(Locale.ENGLISH);
+          }
+        }
+      }
+      startIndex = endIndex;
+    }
+    return "Unknown";
+  }
+
+  private String removeCategoriesFromText(String document) {
+    int startIndex = 0;
+    int categoryIndex;
+    try {
+      while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+        int endIndex = document.indexOf("]]", categoryIndex);
+        if (endIndex >= document.length() || endIndex < 0) {
+          break;
+        }
+        document = document.replace(document.substring(categoryIndex, endIndex + 2), "");
+        if (categoryIndex < document.length()) {
+          startIndex = categoryIndex;
+        } else {
+          break;
+        }
+      }
+    } catch(StringIndexOutOfBoundsException e) {
+      return null;
+    }
+    return document;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
new file mode 100644
index 0000000..fc065fe
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>The Bayes example package provides some helper classes for training the Naive Bayes classifier
+ * on the Twenty Newsgroups data. See {@code PrepareTwentyNewsgroups}
+ * for details on running the trainer and
+ * formatting the Twenty Newsgroups data properly for the training.</p>
+ *
+ * <p>The easiest way to prepare the data is to use the ant task in core/build.xml:</p>
+ *
+ * <p>{@code ant extract-20news-18828}</p>
+ *
+ * <p>This runs the arg line:</p>
+ *
+ * <p>{@code -p $\{working.dir\}/20news-18828/ -o $\{working.dir\}/20news-18828-collapse -a $\{analyzer\} -c UTF-8}</p>
+ *
+ * <p>To Run the Wikipedia examples (assumes you've built the Mahout Job jar):</p>
+ *
+ * <ol>
+ *  <li>Download the Wikipedia Dataset. Use the Ant target: {@code ant enwiki-files}</li>
+ *  <li>Chunk the data using the WikipediaXmlSplitter (from the Hadoop home):
+ *   {@code bin/hadoop jar $MAHOUT_HOME/target/mahout-examples-0.x
+ *   org.apache.mahout.classifier.bayes.WikipediaXmlSplitter
+ *   -d $MAHOUT_HOME/examples/temp/enwiki-latest-pages-articles.xml
+ *   -o $MAHOUT_HOME/examples/work/wikipedia/chunks/ -c 64}</li>
+ * </ol>
+ */
+public final class WikipediaXmlSplitter {
+
+  private static final Logger log = LoggerFactory.getLogger(WikipediaXmlSplitter.class);
+
+  private WikipediaXmlSplitter() { }
+
+  public static void main(String[] args) throws IOException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+
+    Option dumpFileOpt = obuilder.withLongName("dumpFile").withRequired(true).withArgument(
+      abuilder.withName("dumpFile").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The path to the wikipedia dump file (.bz2 or uncompressed)").withShortName("d").create();
+
+    Option outputDirOpt = obuilder.withLongName("outputDir").withRequired(true).withArgument(
+      abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The output directory to place the splits in:\n"
+          + "local files:\n\t/var/data/wikipedia-xml-chunks or\n\tfile:///var/data/wikipedia-xml-chunks\n"
+          + "Hadoop DFS:\n\thdfs://wikipedia-xml-chunks\n"
+          + "AWS S3 (blocks):\n\ts3://bucket-name/wikipedia-xml-chunks\n"
+          + "AWS S3 (native files):\n\ts3n://bucket-name/wikipedia-xml-chunks\n")
+
+    .withShortName("o").create();
+
+    Option s3IdOpt = obuilder.withLongName("s3ID").withRequired(false).withArgument(
+      abuilder.withName("s3Id").withMinimum(1).withMaximum(1).create()).withDescription("Amazon S3 ID key")
+        .withShortName("i").create();
+    Option s3SecretOpt = obuilder.withLongName("s3Secret").withRequired(false).withArgument(
+      abuilder.withName("s3Secret").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Amazon S3 secret key").withShortName("s").create();
+
+    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withRequired(true).withArgument(
+      abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The Size of the chunk, in megabytes").withShortName("c").create();
+    Option numChunksOpt = obuilder
+        .withLongName("numChunks")
+        .withRequired(false)
+        .withArgument(abuilder.withName("numChunks").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+          "The maximum number of chunks to create.  If specified, program will only create a subset of the chunks")
+        .withShortName("n").create();
+    Group group = gbuilder.withName("Options").withOption(dumpFileOpt).withOption(outputDirOpt).withOption(
+      chunkSizeOpt).withOption(numChunksOpt).withOption(s3IdOpt).withOption(s3SecretOpt).create();
+
+    Parser parser = new Parser();
+    parser.setGroup(group);
+    CommandLine cmdLine;
+    try {
+      cmdLine = parser.parse(args);
+    } catch (OptionException e) {
+      log.error("Error while parsing options", e);
+      CommandLineUtil.printHelp(group);
+      return;
+    }
+
+    Configuration conf = new Configuration();
+    String dumpFilePath = (String) cmdLine.getValue(dumpFileOpt);
+    String outputDirPath = (String) cmdLine.getValue(outputDirOpt);
+
+    if (cmdLine.hasOption(s3IdOpt)) {
+      String id = (String) cmdLine.getValue(s3IdOpt);
+      conf.set("fs.s3n.awsAccessKeyId", id);
+      conf.set("fs.s3.awsAccessKeyId", id);
+    }
+    if (cmdLine.hasOption(s3SecretOpt)) {
+      String secret = (String) cmdLine.getValue(s3SecretOpt);
+      conf.set("fs.s3n.awsSecretAccessKey", secret);
+      conf.set("fs.s3.awsSecretAccessKey", secret);
+    }
+    // do not compute crc file when using local FS
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+    FileSystem fs = FileSystem.get(URI.create(outputDirPath), conf);
+
+    int chunkSize = 1024 * 1024 * Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
+
+    int numChunks = Integer.MAX_VALUE;
+    if (cmdLine.hasOption(numChunksOpt)) {
+      numChunks = Integer.parseInt((String) cmdLine.getValue(numChunksOpt));
+    }
+
+    String header = "<mediawiki xmlns=\"http://www.mediawiki.org/xml/export-0.3/\" "
+                    + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" "
+                    + "xsi:schemaLocation=\"http://www.mediawiki.org/xml/export-0.3/ "
+                    + "http://www.mediawiki.org/xml/export-0.3.xsd\" " + "version=\"0.3\" "
+                    + "xml:lang=\"en\">\n" + "  <siteinfo>\n" + "<sitename>Wikipedia</sitename>\n"
+                    + "    <base>http://en.wikipedia.org/wiki/Main_Page</base>\n"
+                    + "    <generator>MediaWiki 1.13alpha</generator>\n" + "    <case>first-letter</case>\n"
+                    + "    <namespaces>\n" + "      <namespace key=\"-2\">Media</namespace>\n"
+                    + "      <namespace key=\"-1\">Special</namespace>\n" + "      <namespace key=\"0\" />\n"
+                    + "      <namespace key=\"1\">Talk</namespace>\n"
+                    + "      <namespace key=\"2\">User</namespace>\n"
+                    + "      <namespace key=\"3\">User talk</namespace>\n"
+                    + "      <namespace key=\"4\">Wikipedia</namespace>\n"
+                    + "      <namespace key=\"5\">Wikipedia talk</namespace>\n"
+                    + "      <namespace key=\"6\">Image</namespace>\n"
+                    + "      <namespace key=\"7\">Image talk</namespace>\n"
+                    + "      <namespace key=\"8\">MediaWiki</namespace>\n"
+                    + "      <namespace key=\"9\">MediaWiki talk</namespace>\n"
+                    + "      <namespace key=\"10\">Template</namespace>\n"
+                    + "      <namespace key=\"11\">Template talk</namespace>\n"
+                    + "      <namespace key=\"12\">Help</namespace>\n"
+                    + "      <namespace key=\"13\">Help talk</namespace>\n"
+                    + "      <namespace key=\"14\">Category</namespace>\n"
+                    + "      <namespace key=\"15\">Category talk</namespace>\n"
+                    + "      <namespace key=\"100\">Portal</namespace>\n"
+                    + "      <namespace key=\"101\">Portal talk</namespace>\n" + "    </namespaces>\n"
+                    + "  </siteinfo>\n";
+
+    StringBuilder content = new StringBuilder();
+    content.append(header);
+    NumberFormat decimalFormatter = new DecimalFormat("0000");
+    File dumpFile = new File(dumpFilePath);
+
+    // If the specified path for the input file is incorrect, return immediately
+    if (!dumpFile.exists()) {
+      log.error("Input file path {} doesn't exist", dumpFilePath);
+      return;
+    }
+
+    FileLineIterator it;
+    if (dumpFilePath.endsWith(".bz2")) {
+      // default compression format from http://download.wikimedia.org
+      CompressionCodec codec = new BZip2Codec();
+      it = new FileLineIterator(codec.createInputStream(new FileInputStream(dumpFile)));
+    } else {
+      // assume the user has previously de-compressed the dump file
+      it = new FileLineIterator(dumpFile);
+    }
+    int fileNumber = 0;
+    while (it.hasNext()) {
+      String thisLine = it.next();
+      if (thisLine.trim().startsWith("<page>")) {
+        boolean end = false;
+        while (!thisLine.trim().startsWith("</page>")) {
+          content.append(thisLine).append('\n');
+          if (it.hasNext()) {
+            thisLine = it.next();
+          } else {
+            end = true;
+            break;
+          }
+        }
+        content.append(thisLine).append('\n');
+
+        if (content.length() > chunkSize || end) {
+          content.append("</mediawiki>");
+          fileNumber++;
+          String filename = outputDirPath + "/chunk-" + decimalFormatter.format(fileNumber) + ".xml";
+          try (BufferedWriter chunkWriter =
+                   new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filename)), "UTF-8"))) {
+            chunkWriter.write(content.toString(), 0, content.length());
+          }
+          if (fileNumber >= numChunks) {
+            break;
+          }
+          content = new StringBuilder();
+          content.append(header);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
new file mode 100644
index 0000000..afd350f
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlInputFormat extends TextInputFormat {
+
+  private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);
+
+  public static final String START_TAG_KEY = "xmlinput.start";
+  public static final String END_TAG_KEY = "xmlinput.end";
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+    try {
+      return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+    } catch (IOException ioe) {
+      log.warn("Error while creating XmlRecordReader", ioe);
+      return null;
+    }
+  }
+
+  /**
+   * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+   * by the start tag and end tag
+   * 
+   */
+  public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+    private final byte[] startTag;
+    private final byte[] endTag;
+    private final long start;
+    private final long end;
+    private final FSDataInputStream fsin;
+    private final DataOutputBuffer buffer = new DataOutputBuffer();
+    private LongWritable currentKey;
+    private Text currentValue;
+
+    public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+      startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
+      endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);
+
+      // open the file and seek to the start of the split
+      start = split.getStart();
+      end = start + split.getLength();
+      Path file = split.getPath();
+      FileSystem fs = file.getFileSystem(conf);
+      fsin = fs.open(split.getPath());
+      fsin.seek(start);
+    }
+
+    private boolean next(LongWritable key, Text value) throws IOException {
+      if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
+        try {
+          buffer.write(startTag);
+          if (readUntilMatch(endTag, true)) {
+            key.set(fsin.getPos());
+            value.set(buffer.getData(), 0, buffer.getLength());
+            return true;
+          }
+        } finally {
+          buffer.reset();
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      Closeables.close(fsin, true);
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return (fsin.getPos() - start) / (float) (end - start);
+    }
+
+    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+      int i = 0;
+      while (true) {
+        int b = fsin.read();
+        // end of file:
+        if (b == -1) {
+          return false;
+        }
+        // save to buffer:
+        if (withinBlock) {
+          buffer.write(b);
+        }
+
+        // check if we're matching:
+        if (b == match[i]) {
+          i++;
+          if (i >= match.length) {
+            return true;
+          }
+        } else {
+          i = 0;
+        }
+        // see if we've passed the stop point:
+        if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+          return false;
+        }
+      }
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+      return currentKey;
+    }
+
+    @Override
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return currentValue;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      currentKey = new LongWritable();
+      currentValue = new Text();
+      return next(currentKey, currentValue);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
new file mode 100644
index 0000000..1c55090
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+/**
+ * Helps with making nice intervals at arbitrary scale.
+ *
+ * One use case is where we are producing progress or error messages every time an incoming
+ * record is received.  It is generally bad form to produce a message for <i>every</i> input
+ * so it would be better to produce a message for each of the first 10 records, then every
+ * other record up to 20 and then every 5 records up to 50 and then every 10 records up to 100,
+ * more or less. The pattern can now repeat scaled up by 100.  The total number of messages will scale
+ * with the log of the number of input lines which is much more survivable than direct output
+ * and because early records all get messages, we get indications early.
+ */
+public class Bump125 {
+  private static final int[] BUMPS = {1, 2, 5};
+
+  static int scale(double value, double base) {
+    double scale = value / base;
+    // scan for correct step
+    int i = 0;
+    while (i < BUMPS.length - 1 && BUMPS[i + 1] <= scale) {
+      i++;
+    }
+    return BUMPS[i];
+  }
+
+  static long base(double value) {
+    return Math.max(1, (long) Math.pow(10, (int) Math.floor(Math.log10(value))));
+  }
+
+  private long counter = 0;
+
+  public long increment() {
+    long delta;
+    if (counter >= 10) {
+      long base = base(counter / 4.0);
+      int scale = scale(counter / 4.0, base);
+      delta = base * scale;
+    } else {
+      delta = 1;
+    }
+    counter += delta;
+    return counter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
new file mode 100644
index 0000000..f63de83
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixWritable;
+
+/**
+ * Export a Matrix in various text formats:
+ *    * CSV file
+ * 
+ * Input format: Hadoop SequenceFile with Text key and MatrixWritable value, 1 pair
+ * TODO:
+ *     Needs class for key value- should not hard-code to Text.
+ *     Options for row and column headers- stats software can be picky.
+ * Assumes only one matrix in a file.
+ */
+public final class MatrixDumper extends AbstractJob {
+  
+  private MatrixDumper() { }
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new MatrixDumper(), args);
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    addInputOption();
+    addOption("output", "o", "Output path", null); // AbstractJob output feature requires param
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+    String outputFile = hasOption("output") ? getOption("output") : null;
+    exportCSV(getInputPath(), outputFile, false);
+    return 0;
+  }
+  
+  private static void exportCSV(Path inputPath, String outputFile, boolean doLabels) throws IOException {
+    SequenceFileValueIterator<MatrixWritable> it =
+        new SequenceFileValueIterator<>(inputPath, true, new Configuration());
+    Matrix m = it.next().get();
+    it.close();
+    PrintStream ps = getPrintStream(outputFile);
+    String[] columnLabels = getLabels(m.numCols(), m.getColumnLabelBindings(), "col");
+    String[] rowLabels = getLabels(m.numRows(), m.getRowLabelBindings(), "row");
+    if (doLabels) {
+      ps.print("rowid,");
+      ps.print(columnLabels[0]);
+      for (int c = 1; c < m.numCols(); c++) {
+        ps.print(',' + columnLabels[c]);
+      }
+      ps.println();
+    }
+    for (int r = 0; r < m.numRows(); r++) {
+      if (doLabels) {
+        ps.print(rowLabels[0] + ',');
+      }
+      ps.print(Double.toString(m.getQuick(r,0)));
+      for (int c = 1; c < m.numCols(); c++) {
+        ps.print(",");
+        ps.print(Double.toString(m.getQuick(r,c)));
+      }
+      ps.println();
+    }
+    if (ps != System.out) {
+      ps.close();
+    }
+  }
+  
+  private static PrintStream getPrintStream(String outputPath) throws IOException {
+    if (outputPath == null) {
+      return System.out;
+    }
+    File outputFile = new File(outputPath);
+    if (outputFile.exists()) {
+      outputFile.delete();
+    }
+    outputFile.createNewFile();
+    OutputStream os = new FileOutputStream(outputFile);
+    return new PrintStream(os, false, Charsets.UTF_8.displayName());
+  }
+  
+  /**
+   * return the label set, sorted by matrix order
+   * if there are no labels, fabricate them using the starter string
+   * @param length 
+   */
+  private static String[] getLabels(int length, Map<String,Integer> labels, String start) {
+    if (labels != null) {
+      return sortLabels(labels);
+    }
+    String[] sorted = new String[length];
+    for (int i = 1; i <= length; i++) {
+      sorted[i] = start + i;
+    }
+    return sorted;
+  }
+  
+  private static String[] sortLabels(Map<String,Integer> labels) {
+    String[] sorted = new String[labels.size()];
+    for (Map.Entry<String,Integer> entry : labels.entrySet()) {
+      sorted[entry.getValue()] = entry.getKey();
+    }
+    return sorted;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
new file mode 100644
index 0000000..e01868a
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.apache.mahout.math.list.IntArrayList;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+public final class SequenceFileDumper extends AbstractJob {
+
+  public SequenceFileDumper() {
+    setConf(new Configuration());
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("substring", "b", "The number of chars to print out per value", false);
+    addOption(buildOption("count", "c", "Report the count only", false, false, null));
+    addOption("numItems", "n", "Output at most <n> key value pairs", false);
+    addOption(buildOption("facets", "fa", "Output the counts per key.  Note, if there are a lot of unique keys, "
+        + "this can take up a fair amount of memory", false, false, null));
+    addOption(buildOption("quiet", "q", "Print only file contents.", false, false, null));
+
+    if (parseArguments(args, false, true) == null) {
+      return -1;
+    }
+
+    Path[] pathArr;
+    Configuration conf = new Configuration();
+    Path input = getInputPath();
+    FileSystem fs = input.getFileSystem(conf);
+    if (fs.getFileStatus(input).isDir()) {
+      pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter()));
+    } else {
+      pathArr = new Path[1];
+      pathArr[0] = input;
+    }
+
+
+    Writer writer;
+    boolean shouldClose;
+    if (hasOption("output")) {
+      shouldClose = true;
+      writer = Files.newWriter(new File(getOption("output")), Charsets.UTF_8);
+    } else {
+      shouldClose = false;
+      writer = new OutputStreamWriter(System.out, Charsets.UTF_8);
+    }
+    try {
+      for (Path path : pathArr) {
+        if (!hasOption("quiet")) {
+          writer.append("Input Path: ").append(String.valueOf(path)).append('\n');
+        }
+
+        int sub = Integer.MAX_VALUE;
+        if (hasOption("substring")) {
+          sub = Integer.parseInt(getOption("substring"));
+        }
+        boolean countOnly = hasOption("count");
+        SequenceFileIterator<?, ?> iterator = new SequenceFileIterator<>(path, true, conf);
+        if (!hasOption("quiet")) {
+          writer.append("Key class: ").append(iterator.getKeyClass().toString());
+          writer.append(" Value Class: ").append(iterator.getValueClass().toString()).append('\n');
+        }
+        OpenObjectIntHashMap<String> facets = null;
+        if (hasOption("facets")) {
+          facets = new OpenObjectIntHashMap<>();
+        }
+        long count = 0;
+        if (countOnly) {
+          while (iterator.hasNext()) {
+            Pair<?, ?> record = iterator.next();
+            String key = record.getFirst().toString();
+            if (facets != null) {
+              facets.adjustOrPutValue(key, 1, 1); //either insert or add 1
+            }
+            count++;
+          }
+          writer.append("Count: ").append(String.valueOf(count)).append('\n');
+        } else {
+          long numItems = Long.MAX_VALUE;
+          if (hasOption("numItems")) {
+            numItems = Long.parseLong(getOption("numItems"));
+            if (!hasOption("quiet")) {
+              writer.append("Max Items to dump: ").append(String.valueOf(numItems)).append("\n");
+            }
+          }
+          while (iterator.hasNext() && count < numItems) {
+            Pair<?, ?> record = iterator.next();
+            String key = record.getFirst().toString();
+            writer.append("Key: ").append(key);
+            String str = record.getSecond().toString();
+            writer.append(": Value: ").append(str.length() > sub 
+                                              ? str.substring(0, sub) : str);
+            writer.write('\n');
+            if (facets != null) {
+              facets.adjustOrPutValue(key, 1, 1); //either insert or add 1
+            }
+            count++;
+          }
+          if (!hasOption("quiet")) {
+            writer.append("Count: ").append(String.valueOf(count)).append('\n');
+          }
+        }
+        if (facets != null) {
+          List<String> keyList = new ArrayList<>(facets.size());
+
+          IntArrayList valueList = new IntArrayList(facets.size());
+          facets.pairsSortedByKey(keyList, valueList);
+          writer.append("-----Facets---\n");
+          writer.append("Key\t\tCount\n");
+          int i = 0;
+          for (String key : keyList) {
+            writer.append(key).append("\t\t").append(String.valueOf(valueList.get(i++))).append('\n');
+          }
+        }
+      }
+      writer.flush();
+
+    } finally {
+      if (shouldClose) {
+        Closeables.close(writer, false);
+      }
+    }
+
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    new SequenceFileDumper().run(args);
+  }
+
+}