You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:51:59 UTC

[31/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045 Delete directories which were moved/no longer in use

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
deleted file mode 100644
index 0ae79ad..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.evaluation;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
-import org.apache.mahout.common.ClassUtils;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
-import org.apache.mahout.math.VectorWritable;
-
-public class RepresentativePointsMapper
-  extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
-
-  private Map<Integer, List<VectorWritable>> representativePoints;
-  private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>();
-  private DistanceMeasure measure = new EuclideanDistanceMeasure();
-
-  @Override
-  protected void cleanup(Context context) throws IOException, InterruptedException {
-    for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
-      context.write(new IntWritable(entry.getKey()), entry.getValue());
-    }
-    super.cleanup(context);
-  }
-
-  @Override
-  protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context)
-    throws IOException, InterruptedException {
-    mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints);
-  }
-
-  public static void mapPoint(IntWritable clusterId,
-                              WeightedVectorWritable point,
-                              DistanceMeasure measure,
-                              Map<Integer, List<VectorWritable>> representativePoints,
-                              Map<Integer, WeightedVectorWritable> mostDistantPoints) {
-    int key = clusterId.get();
-    WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
-
-    List<VectorWritable> repPoints = representativePoints.get(key);
-    double totalDistance = 0.0;
-    if (repPoints != null) {
-      for (VectorWritable refPoint : repPoints) {
-        totalDistance += measure.distance(refPoint.get(), point.getVector());
-      }
-    }
-    if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
-      mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone()));
-    }
-  }
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-    Configuration conf = context.getConfiguration();
-    measure =
-        ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
-    representativePoints = getRepresentativePoints(conf);
-  }
-
-  public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) {
-    this.representativePoints = referencePoints;
-    this.measure = measure;
-  }
-
-  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
-    String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY);
-    return getRepresentativePoints(conf, new Path(statePath));
-  }
-
-  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) {
-    Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>();
-    for (Pair<IntWritable,VectorWritable> record
-         : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath,
-                                                                   PathType.LIST,
-                                                                   PathFilters.logsCRCFilter(),
-                                                                   conf)) {
-      int keyValue = record.getFirst().get();
-      List<VectorWritable> repPoints = representativePoints.get(keyValue);
-      if (repPoints == null) {
-        repPoints = new ArrayList<>();
-        representativePoints.put(keyValue, repPoints);
-      }
-      repPoints.add(record.getSecond());
-    }
-    return representativePoints;
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
deleted file mode 100644
index 27ca861..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.evaluation;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
-import org.apache.mahout.math.VectorWritable;
-
-public class RepresentativePointsReducer
-  extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
-
-  private Map<Integer, List<VectorWritable>> representativePoints;
-
-  @Override
-  protected void cleanup(Context context) throws IOException, InterruptedException {
-    for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) {
-      IntWritable iw = new IntWritable(entry.getKey());
-      for (VectorWritable vw : entry.getValue()) {
-        context.write(iw, vw);
-      }
-    }
-    super.cleanup(context);
-  }
-
-  @Override
-  protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context)
-    throws IOException, InterruptedException {
-    // find the most distant point
-    WeightedVectorWritable mdp = null;
-    for (WeightedVectorWritable dpw : values) {
-      if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
-        mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
-      }
-    }
-    context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector()));
-  }
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-    Configuration conf = context.getConfiguration();
-    representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
-  }
-
-  public void configure(Map<Integer, List<VectorWritable>> representativePoints) {
-    this.representativePoints = representativePoints;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
deleted file mode 100644
index 392909e..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.lda;
-
-import com.google.common.io.Closeables;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.Group;
-import org.apache.commons.cli2.Option;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
-import org.apache.commons.io.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.mahout.common.CommandLineUtil;
-import org.apache.mahout.common.IntPairWritable;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
-import org.apache.mahout.utils.vectors.VectorHelper;
-
-/**
- * Class to print out the top K words for each topic.
- */
-public final class LDAPrintTopics {
-
-  private LDAPrintTopics() { }
-  
-  // Expands the queue list to have a Queue for topic K
-  private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) {
-    for (int i = queues.size(); i <= k; ++i) {
-      queues.add(new PriorityQueue<Pair<String,Double>>());
-    }
-  }
-  
-  public static void main(String[] args) throws Exception {
-    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
-    ArgumentBuilder abuilder = new ArgumentBuilder();
-    GroupBuilder gbuilder = new GroupBuilder();
-    
-    Option inputOpt = DefaultOptionCreator.inputOption().create();
-    
-    Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument(
-      abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription(
-      "Dictionary to read in, in the same format as one created by "
-          + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create();
-    
-    Option outOpt = DefaultOptionCreator.outputOption().create();
-    
-    Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument(
-      abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription(
-      "Number of words to print").withShortName("w").create();
-    Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument(
-      abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The dictionary file type (text|sequencefile)").withShortName("dt").create();
-    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
-        .create();
-    
-    Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt)
-        .withOption(inputOpt).withOption(dictTypeOpt).create();
-    try {
-      Parser parser = new Parser();
-      parser.setGroup(group);
-      CommandLine cmdLine = parser.parse(args);
-      
-      if (cmdLine.hasOption(helpOpt)) {
-        CommandLineUtil.printHelp(group);
-        return;
-      }
-      
-      String input = cmdLine.getValue(inputOpt).toString();
-      String dictFile = cmdLine.getValue(dictOpt).toString();
-      int numWords = 20;
-      if (cmdLine.hasOption(wordOpt)) {
-        numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString());
-      }
-      Configuration config = new Configuration();
-      
-      String dictionaryType = "text";
-      if (cmdLine.hasOption(dictTypeOpt)) {
-        dictionaryType = cmdLine.getValue(dictTypeOpt).toString();
-      }
-      
-      List<String> wordList;
-      if ("text".equals(dictionaryType)) {
-        wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile)));
-      } else if ("sequencefile".equals(dictionaryType)) {
-        wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile));
-      } else {
-        throw new IllegalArgumentException("Invalid dictionary format");
-      }
-      
-      List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords);
-
-      File output = null;
-      if (cmdLine.hasOption(outOpt)) {
-        output = new File(cmdLine.getValue(outOpt).toString());
-        if (!output.exists() && !output.mkdirs()) {
-          throw new IOException("Could not create directory: " + output);
-        }
-      }
-      printTopWords(topWords, output);
-    } catch (OptionException e) {
-      CommandLineUtil.printHelp(group);
-      throw e;
-    }
-  }
-  
-  // Adds the word if the queue is below capacity, or the score is high enough
-  private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) {
-    if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) {
-      q.poll();
-    }
-    if (q.size() < numWordsToPrint) {
-      q.add(new Pair<>(word, score));
-    }
-  }
-  
-  private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir)
-    throws IOException {
-    for (int i = 0; i < topWords.size(); ++i) {
-      Collection<Pair<String,Double>> topK = topWords.get(i);
-      Writer out = null;
-      boolean printingToSystemOut = false;
-      try {
-        if (outputDir != null) {
-          out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8);
-        } else {
-          out = new OutputStreamWriter(System.out, Charsets.UTF_8);
-          printingToSystemOut = true;
-          out.write("Topic " + i);
-          out.write('\n');
-          out.write("===========");
-          out.write('\n');
-        }
-        List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size());
-        for (Pair<String,Double> wordWithScore : topK) {
-          topKasList.add(wordWithScore);
-        }
-        Collections.sort(topKasList, new Comparator<Pair<String,Double>>() {
-          @Override
-          public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) {
-            return pair2.getSecond().compareTo(pair1.getSecond());
-          }
-        });
-        for (Pair<String,Double> wordWithScore : topKasList) {
-          out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = "
-            + wordWithScore.getSecond());
-          out.write('\n');
-        }
-      } finally {
-        if (!printingToSystemOut) {
-          Closeables.close(out, false);
-        } else {
-          out.flush();
-        }
-      }
-    }
-  }
-  
-  private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir,
-                                                                    Configuration job,
-                                                                    List<String> wordList,
-                                                                    int numWordsToPrint) {
-    List<Queue<Pair<String,Double>>> queues = new ArrayList<>();
-    Map<Integer,Double> expSums = new HashMap<>();
-    for (Pair<IntPairWritable,DoubleWritable> record
-        : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>(
-            new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) {
-      IntPairWritable key = record.getFirst();
-      int topic = key.getFirst();
-      int word = key.getSecond();
-      ensureQueueSize(queues, topic);
-      if (word >= 0 && topic >= 0) {
-        double score = record.getSecond().get();
-        if (expSums.get(topic) == null) {
-          expSums.put(topic, 0.0);
-        }
-        expSums.put(topic, expSums.get(topic) + Math.exp(score));
-        String realWord = wordList.get(word);
-        maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint);
-      }
-    }
-    for (int i = 0; i < queues.size(); i++) {
-      Queue<Pair<String,Double>> queue = queues.get(i);
-      Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size());
-      double norm = expSums.get(i);
-      for (Pair<String,Double> pair : queue) {
-        newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm));
-      }
-      queues.set(i, newQueue);
-    }
-    return queues;
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
deleted file mode 100644
index 12ed471..0000000
--- a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.text;
-
-import org.apache.lucene.analysis.TokenFilter;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.Tokenizer;
-import org.apache.lucene.analysis.core.LowerCaseFilter;
-import org.apache.lucene.analysis.core.StopFilter;
-import org.apache.lucene.analysis.en.PorterStemFilter;
-import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
-import org.apache.lucene.analysis.standard.StandardFilter;
-import org.apache.lucene.analysis.standard.StandardTokenizer;
-import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.apache.lucene.analysis.util.CharArraySet;
-import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Custom Lucene Analyzer designed for aggressive feature reduction
- * for clustering the ASF Mail Archives using an extended set of
- * stop words, excluding non-alpha-numeric tokens, and porter stemming.
- */
-public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase {
-  // extended set of stop words composed of common mail terms like "hi",
-  // HTML tags, and Java keywords asmany of the messages in the archives
-  // are subversion check-in notifications
-    
-  private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList(
-    "3d","7bit","a0","about","above","abstract","across","additional","after",
-    "afterwards","again","against","align","all","almost","alone","along",
-    "already","also","although","always","am","among","amongst","amoungst",
-    "amount","an","and","another","any","anybody","anyhow","anyone","anything",
-    "anyway","anywhere","are","arial","around","as","ascii","assert","at",
-    "back","background","base64","bcc","be","became","because","become","becomes",
-    "becoming","been","before","beforehand","behind","being","below","beside",
-    "besides","between","beyond","bgcolor","blank","blockquote","body","boolean",
-    "border","both","br","break","but","by","can","cannot","cant","case","catch",
-    "cc","cellpadding","cellspacing","center","char","charset","cheers","class",
-    "co","color","colspan","com","con","const","continue","could","couldnt",
-    "cry","css","de","dear","default","did","didnt","different","div","do",
-    "does","doesnt","done","dont","double","down","due","during","each","eg",
-    "eight","either","else","elsewhere","empty","encoding","enough","enum",
-    "etc","eu","even","ever","every","everyone","everything","everywhere",
-    "except","extends","face","family","few","ffffff","final","finally","float",
-    "font","for","former","formerly","fri","from","further","get","give","go",
-    "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head",
-    "height","hello","helvetica","hence","her","here","hereafter","hereby",
-    "herein","hereupon","hers","herself","hi","him","himself","his","how",
-    "however","hr","href","html","http","https","id","ie","if","ill","im",
-    "image","img","implements","import","in","inc","instanceof","int","interface",
-    "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep",
-    "last","latter","latterly","least","left","less","li","like","long","look",
-    "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message",
-    "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml",
-    "mso","much","must","my","myself","name","namely","native","nbsp","need",
-    "neither","never","nevertheless","new","next","nine","no","nobody","none",
-    "noone","nor","not","nothing","now","nowhere","null","of","off","often",
-    "ok","on","once","only","onto","or","org","other","others","otherwise",
-    "our","ours","ourselves","out","over","own","package","pad","per","perhaps",
-    "plain","please","pm","printable","private","protected","public","put",
-    "quot","quote","r1","r2","rather","re","really","regards","reply","return",
-    "right","said","same","sans","sat","say","saying","see","seem","seemed",
-    "seeming","seems","serif","serious","several","she","short","should","show",
-    "side","since","sincere","six","sixty","size","so","solid","some","somehow",
-    "someone","something","sometime","sometimes","somewhere","span","src",
-    "static","still","strictfp","string","strong","style","stylesheet","subject",
-    "such","sun","super","sure","switch","synchronized","table","take","target",
-    "td","text","th","than","thanks","that","the","their","them","themselves",
-    "then","thence","there","thereafter","thereby","therefore","therein","thereupon",
-    "these","they","thick","thin","think","third","this","those","though",
-    "three","through","throughout","throw","throws","thru","thu","thus","tm",
-    "to","together","too","top","toward","towards","tr","transfer","transient",
-    "try","tue","type","ul","un","under","unsubscribe","until","up","upon",
-    "us","use","used","uses","using","valign","verdana","very","via","void",
-    "volatile","want","was","we","wed","weight","well","were","what","whatever",
-    "when","whence","whenever","where","whereafter","whereas","whereby","wherein",
-    "whereupon","wherever","whether","which","while","whither","who","whoever",
-    "whole","whom","whose","why","width","will","with","within","without",
-    "wont","would","wrote","www","yes","yet","you","your","yours","yourself",
-    "yourselves"
-  ), false);
-
-  // Regex used to exclude non-alpha-numeric tokens
-  private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$");
-  private static final Matcher MATCHER = ALPHA_NUMERIC.matcher("");
-
-  public MailArchivesClusteringAnalyzer() {
-    super(STOP_SET);
-  }
-
-  public MailArchivesClusteringAnalyzer(CharArraySet stopSet) {
-    super(stopSet);
-  }
-  
-  @Override
-  protected TokenStreamComponents createComponents(String fieldName) {
-    Tokenizer tokenizer = new StandardTokenizer();
-    TokenStream result = new StandardFilter(tokenizer);
-    result = new LowerCaseFilter(result);
-    result = new ASCIIFoldingFilter(result);
-    result = new AlphaNumericMaxLengthFilter(result);
-    result = new StopFilter(result, STOP_SET);
-    result = new PorterStemFilter(result);
-    return new TokenStreamComponents(tokenizer, result);
-  }
-
-  /**
-   * Matches alpha-numeric tokens between 2 and 40 chars long.
-   */
-  static class AlphaNumericMaxLengthFilter extends TokenFilter {
-    private final CharTermAttribute termAtt;
-    private final char[] output = new char[28];
-
-    AlphaNumericMaxLengthFilter(TokenStream in) {
-      super(in);
-      termAtt = addAttribute(CharTermAttribute.class);
-    }
-
-    @Override
-    public final boolean incrementToken() throws IOException {
-      // return the first alpha-numeric token between 2 and 40 length
-      while (input.incrementToken()) {
-        int length = termAtt.length();
-        if (length >= 2 && length <= 28) {
-          char[] buf = termAtt.buffer();
-          int at = 0;
-          for (int c = 0; c < length; c++) {
-            char ch = buf[c];
-            if (ch != '\'') {
-              output[at++] = ch;
-            }
-          }
-          String term = new String(output, 0, at);
-          MATCHER.reset(term);
-          if (MATCHER.matches() && !term.startsWith("a0")) {
-            termAtt.setEmpty();
-            termAtt.append(term);
-            return true;
-          }
-        }
-      }
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
deleted file mode 100644
index 44df006..0000000
--- a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-/**
- * 
- * Used in combining a large number of text files into one text input reader
- * along with the WholeFileRecordReader class.
- * 
- */
-public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
-
-  @Override
-  public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
-                                                                      TaskAttemptContext taskAttemptContext)
-      throws IOException {
-    return new CombineFileRecordReader<>((CombineFileSplit) inputSplit,
-        taskAttemptContext, WholeFileRecordReader.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
deleted file mode 100644
index 37ebc44..0000000
--- a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.utils.io.ChunkedWriter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-/**
- * Default parser for parsing text into sequence files.
- */
-public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter {
-
-  public PrefixAdditionFilter(Configuration conf,
-                              String keyPrefix,
-                              Map<String, String> options, 
-                              ChunkedWriter writer,
-                              Charset charset,
-                              FileSystem fs) {
-    super(conf, keyPrefix, options, writer, charset, fs);
-  }
-
-  @Override
-  protected void process(FileStatus fst, Path current) throws IOException {
-    FileSystem fs = getFs();
-    ChunkedWriter writer = getWriter();
-    if (fst.isDir()) {
-      String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName();
-      fs.listStatus(fst.getPath(),
-                    new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs));
-    } else {
-      try (InputStream in = fs.open(fst.getPath())){
-        StringBuilder file = new StringBuilder();
-        for (String aFit : new FileLineIterable(in, getCharset(), false)) {
-          file.append(aFit).append('\n');
-        }
-        String name = current.getName().equals(fst.getPath().getName())
-            ? current.getName()
-            : current.getName() + Path.SEPARATOR + fst.getPath().getName();
-        writer.write(getPrefix() + Path.SEPARATOR + name, file.toString());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
deleted file mode 100644
index 311ab8d..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.ClassUtils;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.utils.io.ChunkedWriter;
-
-/**
- * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a
- * parent directory containing sub folders of text documents and recursively reads the files and creates the
- * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the
- * document from the parent directory prepended with a specified prefix. You can also specify the input encoding
- * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text.
- */
-public class SequenceFilesFromDirectory extends AbstractJob {
-
-  private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
-
-  private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
-  public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
-  private static final String[] CHARSET_OPTION = {"charset", "c"};
-
-  private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
-
-  public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
-  public static final String BASE_INPUT_PATH = "baseinputpath";
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new SequenceFilesFromDirectory(), args);
-  }
-
-  /*
-  * callback main after processing MapReduce parameters
-  */
-  @Override
-  public int run(String[] args) throws Exception {
-    addOptions();
-    addOption(DefaultOptionCreator.methodOption().create());
-    addOption(DefaultOptionCreator.overwriteOption().create());
-
-    if (parseArguments(args) == null) {
-      return -1;
-    }
-
-    Map<String, String> options = parseOptions();
-    Path output = getOutputPath();
-    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
-      HadoopUtil.delete(getConf(), output);
-    }
-
-    if (getOption(DefaultOptionCreator.METHOD_OPTION,
-      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
-      runSequential(getConf(), getInputPath(), output, options);
-    } else {
-      runMapReduce(getInputPath(), output);
-    }
-
-    return 0;
-  }
-
-  private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
-    throws IOException, InterruptedException, NoSuchMethodException {
-    // Running sequentially
-    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-    String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
-    FileSystem fs = FileSystem.get(input.toUri(), conf);
-
-    try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) {
-      SequenceFilesFromDirectoryFilter pathFilter;
-      String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
-      if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
-        pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
-      } else {
-        pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
-          new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
-          new Object[] {conf, keyPrefix, options, writer, charset, fs});
-      }
-      fs.listStatus(input, pathFilter);
-    }
-    return 0;
-  }
-
-  private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
-
-    int chunkSizeInMB = 64;
-    if (hasOption(CHUNK_SIZE_OPTION[0])) {
-      chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
-    }
-
-    String keyPrefix = null;
-    if (hasOption(KEY_PREFIX_OPTION[0])) {
-      keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
-    }
-
-    String fileFilterClassName = null;
-    if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
-      fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
-    }
-
-    PathFilter pathFilter = null;
-    // Prefix Addition is presently handled in the Mapper and unlike runsequential()
-    // need not be done via a pathFilter
-    if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
-      try {
-        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-
-    // Prepare Job for submission.
-    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
-      SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
-      SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
-
-    Configuration jobConfig = job.getConfiguration();
-    jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
-    jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
-
-    FileSystem fs = FileSystem.get(jobConfig);
-    FileStatus fsFileStatus = fs.getFileStatus(input);
-
-    String inputDirList;
-    if (pathFilter != null) {
-      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
-    } else {
-      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
-    }
-
-    jobConfig.set(BASE_INPUT_PATH, input.toString());
-
-    long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
-
-    // set the max split locations, otherwise we get nasty debug stuff
-    jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
-
-    FileInputFormat.setInputPaths(job, inputDirList);
-    // need to set this to a multiple of the block size, or no split happens
-    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
-    FileOutputFormat.setCompressOutput(job, true);
-
-    boolean succeeded = job.waitForCompletion(true);
-    if (!succeeded) {
-      return -1;
-    }
-    return 0;
-  }
-
-  /**
-   * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
-   * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
-   */
-  protected void addOptions() {
-    addInputOption();
-    addOutputOption();
-    addOption(DefaultOptionCreator.overwriteOption().create());
-    addOption(DefaultOptionCreator.methodOption().create());
-    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
-    addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
-      "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
-    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
-    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
-      "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
-  }
-
-  /**
-   * Override this method in order to parse your additional options from the command line. Do not forget to call
-   * super() otherwise standard options (input/output dirs etc) will not be available.
-   *
-   * @return Map of options
-   */
-  protected Map<String, String> parseOptions() {
-    Map<String, String> options = new HashMap<>();
-    options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
-    options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0]));
-    options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
-    return options;
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
deleted file mode 100644
index 6e4bd64..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.mahout.utils.io.ChunkedWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-/**
- * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic.
- */
-public abstract class SequenceFilesFromDirectoryFilter implements PathFilter {
-  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
-
-  private final String prefix;
-  private final ChunkedWriter writer;
-  private final Charset charset;
-  private final FileSystem fs;
-  private final Map<String, String> options;
-  private final Configuration conf;
-
-  protected SequenceFilesFromDirectoryFilter(Configuration conf,
-                                             String keyPrefix,
-                                             Map<String, String> options,
-                                             ChunkedWriter writer,
-                                             Charset charset,
-                                             FileSystem fs) {
-    this.prefix = keyPrefix;
-    this.writer = writer;
-    this.charset = charset;
-    this.fs = fs;
-    this.options = options;
-    this.conf = conf;
-  }
-
-  protected final String getPrefix() {
-    return prefix;
-  }
-
-  protected final ChunkedWriter getWriter() {
-    return writer;
-  }
-
-  protected final Charset getCharset() {
-    return charset;
-  }
-
-  protected final FileSystem getFs() {
-    return fs;
-  }
-
-  protected final Map<String, String> getOptions() {
-    return options;
-  }
-  
-  protected final Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public final boolean accept(Path current) {
-    log.debug("CURRENT: {}", current.getName());
-    try {
-      for (FileStatus fst : fs.listStatus(current)) {
-        log.debug("CHILD: {}", fst.getPath().getName());
-        process(fst, current);
-      }
-    } catch (IOException ioe) {
-      throw new IllegalStateException(ioe);
-    }
-    return false;
-  }
-
-  protected abstract void process(FileStatus in, Path current) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
deleted file mode 100644
index 40df3c2..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.mahout.common.HadoopUtil;
-
-import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION;
-
-/**
- * Map class for SequenceFilesFromDirectory MR job
- */
-public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
-
-  private String keyPrefix;
-  private Text fileValue = new Text();
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-    this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], "");
-  }
-
-  public void map(IntWritable key, BytesWritable value, Context context)
-    throws IOException, InterruptedException {
-
-    Configuration configuration = context.getConfiguration();
-    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
-    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
-
-    String filename = this.keyPrefix.length() > 0 ?
-      this.keyPrefix + Path.SEPARATOR + relativeFilePath :
-      Path.SEPARATOR + relativeFilePath;
-
-    fileValue.set(value.getBytes(), 0, value.getBytes().length);
-    context.write(new Text(filename), fileValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
deleted file mode 100644
index c17cc12..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.text;
-
-import org.apache.commons.io.DirectoryWalker;
-import org.apache.commons.io.comparator.CompositeFileComparator;
-import org.apache.commons.io.comparator.DirectoryFileComparator;
-import org.apache.commons.io.comparator.PathFileComparator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.utils.email.MailOptions;
-import org.apache.mahout.utils.email.MailProcessor;
-import org.apache.mahout.utils.io.ChunkedWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Converts a directory of gzipped mail archives into SequenceFiles of specified
- * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except
- * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and
- * body text of each mail message into a separate key/value pair.
- */
-public final class SequenceFilesFromMailArchives extends AbstractJob {
-
-  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
-
-  public static final String[] CHUNK_SIZE_OPTION     = {"chunkSize", "chunk"};
-  public static final String[] KEY_PREFIX_OPTION     = {"keyPrefix", "prefix"};
-  public static final String[] CHARSET_OPTION        = {"charset", "c"};
-  public static final String[] SUBJECT_OPTION        = {"subject", "s"};
-  public static final String[] TO_OPTION             = {"to", "to"};
-  public static final String[] FROM_OPTION           = {"from", "from"};
-  public static final String[] REFERENCES_OPTION     = {"references", "refs"};
-  public static final String[] BODY_OPTION           = {"body", "b"};
-  public static final String[] STRIP_QUOTED_OPTION   = {"stripQuoted", "q"};
-  public static final String[] QUOTED_REGEX_OPTION   = {"quotedRegex", "regex"};
-  public static final String[] SEPARATOR_OPTION      = {"separator", "sep"};
-  public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"};
-  public static final String BASE_INPUT_PATH         = "baseinputpath";
-
-  private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
-
-  public void createSequenceFiles(MailOptions options) throws IOException {
-    try (ChunkedWriter writer =
-             new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){
-      MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
-      if (options.getInput().isDirectory()) {
-        PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer);
-        walker.walk(options.getInput());
-        log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath());
-      } else {
-        long start = System.currentTimeMillis();
-        long cnt = processor.parseMboxLineByLine(options.getInput());
-        long finish = System.currentTimeMillis();
-        log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start);
-      }
-    }
-  }
-
-  private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> {
-
-    @SuppressWarnings("unchecked")
-    private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator(
-        DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR);
-
-    private final Deque<MailProcessor> processors = new ArrayDeque<>();
-    private final ChunkedWriter writer;
-    private final Deque<Long> messageCounts = new ArrayDeque<>();
-
-    public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) {
-      processors.addFirst(processor);
-      this.writer = writer;
-      messageCounts.addFirst(0L);
-    }
-
-    public void walk(File startDirectory) throws IOException {
-      super.walk(startDirectory, null);
-    }
-
-    public long getMessageCount() {
-      return messageCounts.getFirst();
-    }
-
-    @Override
-    protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException {
-      if (depth > 0) {
-        log.info("At {}", current.getAbsolutePath());
-        MailProcessor processor = processors.getFirst();
-        MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix()
-            + File.separator + current.getName(), writer);
-        processors.push(subDirProcessor);
-        messageCounts.push(0L);
-      }
-    }
-
-    @Override
-    protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException {
-      Arrays.sort(files, FILE_COMPARATOR);
-      return files;
-    }
-
-    @Override
-    protected void handleFile(File current, int depth, Collection<Object> results) throws IOException {
-      MailProcessor processor = processors.getFirst();
-      long currentDirMessageCount = messageCounts.pop();
-      try {
-        currentDirMessageCount += processor.parseMboxLineByLine(current);
-      } catch (IOException e) {
-        throw new IllegalStateException("Error processing " + current, e);
-      }
-      messageCounts.push(currentDirMessageCount);
-    }
-
-    @Override
-    protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException {
-      if (depth > 0) {
-        final long currentDirMessageCount = messageCounts.pop();
-        log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath());
-
-        processors.pop();
-
-        // aggregate message counts
-        long parentDirMessageCount = messageCounts.pop();
-        parentDirMessageCount += currentDirMessageCount;
-        messageCounts.push(parentDirMessageCount);
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    addInputOption();
-    addOutputOption();
-    addOption(DefaultOptionCreator.methodOption().create());
-
-    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
-    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
-    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
-      "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
-    addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text.  Default is false");
-    addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text.  Default is false");
-    addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text.  Default is false");
-    addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1],
-      "Include the references field in the text.  Default is false");
-    addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output.  Default is false");
-    addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1],
-      "Strip (remove) quoted email text in the body.  Default is false");
-    addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1],
-        "Specify the regex that identifies quoted text.  "
-          + "Default is to look for > or | at the beginning of the line.");
-    addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1],
-        "The separator to use between metadata items (to, from, etc.).  Default is \\n", "\n");
-    addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1],
-        "The separator to use between lines in the body.  Default is \\n.  "
-          + "Useful to change if you wish to have the message be on one line", "\n");
-
-    addOption(DefaultOptionCreator.helpOption());
-    Map<String, List<String>> parsedArgs = parseArguments(args);
-    if (parsedArgs == null) {
-      return -1;
-    }
-    File input = getInputFile();
-    String outputDir = getOutputPath().toString();
-
-    int chunkSize = 64;
-    if (hasOption(CHUNK_SIZE_OPTION[0])) {
-      chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
-    }
-
-    String prefix = "";
-    if (hasOption(KEY_PREFIX_OPTION[0])) {
-      prefix = getOption(KEY_PREFIX_OPTION[0]);
-    }
-
-    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-    MailOptions options = new MailOptions();
-    options.setInput(input);
-    options.setOutputDir(outputDir);
-    options.setPrefix(prefix);
-    options.setChunkSize(chunkSize);
-    options.setCharset(charset);
-
-    List<Pattern> patterns = new ArrayList<>(5);
-    // patternOrder is used downstream so that we can know what order the text
-    // is in instead of encoding it in the string, which
-    // would require more processing later to remove it pre feature selection.
-    Map<String, Integer> patternOrder = new HashMap<>();
-    int order = 0;
-    if (hasOption(FROM_OPTION[0])) {
-      patterns.add(MailProcessor.FROM_PREFIX);
-      patternOrder.put(MailOptions.FROM, order++);
-    }
-    if (hasOption(TO_OPTION[0])) {
-      patterns.add(MailProcessor.TO_PREFIX);
-      patternOrder.put(MailOptions.TO, order++);
-    }
-    if (hasOption(REFERENCES_OPTION[0])) {
-      patterns.add(MailProcessor.REFS_PREFIX);
-      patternOrder.put(MailOptions.REFS, order++);
-    }
-    if (hasOption(SUBJECT_OPTION[0])) {
-      patterns.add(MailProcessor.SUBJECT_PREFIX);
-      patternOrder.put(MailOptions.SUBJECT, order += 1);
-    }
-    options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0]));
-
-    options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
-    options.setPatternOrder(patternOrder);
-    options.setIncludeBody(hasOption(BODY_OPTION[0]));
-
-    if (hasOption(SEPARATOR_OPTION[0])) {
-      options.setSeparator(getOption(SEPARATOR_OPTION[0]));
-    } else {
-      options.setSeparator("\n");
-    }
-
-    if (hasOption(BODY_SEPARATOR_OPTION[0])) {
-      options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0]));
-    }
-
-    if (hasOption(QUOTED_REGEX_OPTION[0])) {
-      options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])));
-    }
-
-    if (getOption(DefaultOptionCreator.METHOD_OPTION,
-      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
-      runSequential(options);
-    } else {
-      runMapReduce(getInputPath(), getOutputPath());
-    }
-
-    return 0;
-  }
-
-  private int runSequential(MailOptions options)
-    throws IOException, InterruptedException, NoSuchMethodException {
-
-    long start = System.currentTimeMillis();
-    createSequenceFiles(options);
-    long finish = System.currentTimeMillis();
-    log.info("Conversion took {}ms", finish - start);
-
-    return 0;
-  }
-
-  private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
-
-    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
-      Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
-
-    Configuration jobConfig = job.getConfiguration();
-
-    if (hasOption(KEY_PREFIX_OPTION[0])) {
-      jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0]));
-    }
-
-    int chunkSize = 0;
-    if (hasOption(CHUNK_SIZE_OPTION[0])) {
-      chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
-      jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize));
-    }
-
-    Charset charset;
-    if (hasOption(CHARSET_OPTION[0])) {
-      charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-      jobConfig.set(CHARSET_OPTION[0], charset.displayName());
-    }
-
-    if (hasOption(FROM_OPTION[0])) {
-      jobConfig.set(FROM_OPTION[1], "true");
-    }
-
-    if (hasOption(TO_OPTION[0])) {
-      jobConfig.set(TO_OPTION[1], "true");
-    }
-
-    if (hasOption(REFERENCES_OPTION[0])) {
-      jobConfig.set(REFERENCES_OPTION[1], "true");
-    }
-
-    if (hasOption(SUBJECT_OPTION[0])) {
-      jobConfig.set(SUBJECT_OPTION[1], "true");
-    }
-
-    if (hasOption(QUOTED_REGEX_OPTION[0])) {
-      jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString());
-    }
-
-    if (hasOption(SEPARATOR_OPTION[0])) {
-      jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0]));
-    } else {
-      jobConfig.set(SEPARATOR_OPTION[1], "\n");
-    }
-
-    if (hasOption(BODY_OPTION[0])) {
-      jobConfig.set(BODY_OPTION[1], "true");
-    } else {
-      jobConfig.set(BODY_OPTION[1], "false");
-    }
-
-    if (hasOption(BODY_SEPARATOR_OPTION[0])) {
-      jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0]));
-    } else {
-      jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n");
-    }
-
-    FileSystem fs = FileSystem.get(jobConfig);
-    FileStatus fsFileStatus = fs.getFileStatus(inputPath);
-
-    jobConfig.set(BASE_INPUT_PATH, inputPath.toString());
-    String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
-    FileInputFormat.setInputPaths(job, inputDirList);
-
-    long chunkSizeInBytes = chunkSize * 1024 * 1024;
-    // need to set this to a multiple of the block size, or no split happens
-    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
-
-    // set the max split locations, otherwise we get nasty debug stuff
-    jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
-
-    boolean succeeded = job.waitForCompletion(true);
-    if (!succeeded) {
-      return -1;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
deleted file mode 100644
index 203e8fb..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.utils.email.MailOptions;
-import org.apache.mahout.utils.email.MailProcessor;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION;
-
-/**
- * Map Class for the SequenceFilesFromMailArchives job
- */
-public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
-
-  private Text outKey = new Text();
-  private Text outValue = new Text();
-
-  private static final Pattern MESSAGE_START = Pattern.compile(
-    "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
-  private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
-    "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
-
-  private MailOptions options;
-
-  @Override
-  public void setup(Context context) throws IOException, InterruptedException {
-
-    Configuration configuration = context.getConfiguration();
-
-    // absorb all of the options into the MailOptions object
-    this.options = new MailOptions();
-
-    options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], ""));
-
-    if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) {
-      options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64));
-    }
-
-    if (!configuration.get(CHARSET_OPTION[0], "").equals("")) {
-      Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8"));
-      options.setCharset(charset);
-    } else {
-      Charset charset = Charset.forName("UTF-8");
-      options.setCharset(charset);
-    }
-
-    List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
-    // patternOrder is used downstream so that we can know what order the
-    // text is in instead
-    // of encoding it in the string, which
-    // would require more processing later to remove it pre feature
-    // selection.
-    Map<String, Integer> patternOrder = Maps.newHashMap();
-    int order = 0;
-    if (!configuration.get(FROM_OPTION[1], "").equals("")) {
-      patterns.add(MailProcessor.FROM_PREFIX);
-      patternOrder.put(MailOptions.FROM, order++);
-    }
-
-    if (!configuration.get(TO_OPTION[1], "").equals("")) {
-      patterns.add(MailProcessor.TO_PREFIX);
-      patternOrder.put(MailOptions.TO, order++);
-    }
-
-    if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) {
-      patterns.add(MailProcessor.REFS_PREFIX);
-      patternOrder.put(MailOptions.REFS, order++);
-    }
-
-    if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) {
-      patterns.add(MailProcessor.SUBJECT_PREFIX);
-      patternOrder.put(MailOptions.SUBJECT, order += 1);
-    }
-
-    options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false));
-
-    options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
-    options.setPatternOrder(patternOrder);
-
-    options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false));
-
-    options.setSeparator("\n");
-    if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) {
-      options.setSeparator(configuration.get(SEPARATOR_OPTION[1], ""));
-    }
-    if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) {
-      options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], ""));
-    }
-    if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) {
-      options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], "")));
-    }
-
-  }
-
-  public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context)
-    throws IOException, InterruptedException {
-    long messageCount = 0;
-    try {
-      StringBuilder contents = new StringBuilder();
-      StringBuilder body = new StringBuilder();
-      Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
-      Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
-      String[] patternResults = new String[options.getPatternsToMatch().length];
-      Matcher[] matches = new Matcher[options.getPatternsToMatch().length];
-      for (int i = 0; i < matches.length; i++) {
-        matches[i] = options.getPatternsToMatch()[i].matcher("");
-      }
-
-      String messageId = null;
-      boolean inBody = false;
-      Pattern quotedTextPattern = options.getQuotedTextPattern();
-
-      for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) {
-        if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
-          for (int i = 0; i < matches.length; i++) {
-            Matcher matcher = matches[i];
-            matcher.reset(nextLine);
-            if (matcher.matches()) {
-              patternResults[i] = matcher.group(1);
-            }
-          }
-
-          // only start appending body content after we've seen a message ID
-          if (messageId != null) {
-            // first, see if we hit the end of the message
-            messageBoundaryMatcher.reset(nextLine);
-            if (messageBoundaryMatcher.matches()) {
-              // done parsing this message ... write it out
-              String key = generateKey(filename, options.getPrefix(), messageId);
-              // if this ordering changes, then also change
-              // FromEmailToDictionaryMapper
-              writeContent(options.getSeparator(), contents, body, patternResults);
-
-              this.outKey.set(key);
-              this.outValue.set(contents.toString());
-              context.write(this.outKey, this.outValue);
-              contents.setLength(0); // reset the buffer
-              body.setLength(0);
-              messageId = null;
-              inBody = false;
-            } else {
-              if (inBody && options.isIncludeBody()) {
-                if (!nextLine.isEmpty()) {
-                  body.append(nextLine).append(options.getBodySeparator());
-                }
-              } else {
-                // first empty line we see after reading the message Id
-                // indicates that we are in the body ...
-                inBody = nextLine.isEmpty();
-              }
-            }
-          } else {
-            if (nextLine.length() > 14) {
-              messageIdMatcher.reset(nextLine);
-              if (messageIdMatcher.matches()) {
-                messageId = messageIdMatcher.group(1);
-                ++messageCount;
-              }
-            }
-          }
-        }
-      }
-      // write the last message in the file if available
-      if (messageId != null) {
-        String key = generateKey(filename, options.getPrefix(), messageId);
-        writeContent(options.getSeparator(), contents, body, patternResults);
-        this.outKey.set(key);
-        this.outValue.set(contents.toString());
-        context.write(this.outKey, this.outValue);
-        contents.setLength(0); // reset the buffer
-      }
-    } catch (FileNotFoundException ignored) {
-
-    }
-    return messageCount;
-  }
-
-  protected static String generateKey(String mboxFilename, String prefix, String messageId) {
-    return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator());
-  }
-
-  private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
-    String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator());
-    contents.append(matchesString).append(separator).append(body);
-  }
-
-  public void map(IntWritable key, BytesWritable value, Context context)
-    throws IOException, InterruptedException {
-    Configuration configuration = context.getConfiguration();
-    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
-    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
-    ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
-    parseMailboxLineByLine(relativeFilePath, is, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
deleted file mode 100644
index cacfd22..0000000
--- a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-
-import java.io.IOException;
-
-public class TextParagraphSplittingJob extends AbstractJob {
-
-  @Override
-  public int run(String[] strings) throws Exception {
-    Configuration originalConf = getConf();
-    Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")),
-                         new Path(originalConf.get("mapred.output.dir")),
-                         SequenceFileInputFormat.class,
-                         SplitMap.class,
-                         Text.class,
-                         Text.class,
-                         Reducer.class,
-                         Text.class,
-                         Text.class,
-                         SequenceFileOutputFormat.class);
-    job.setNumReduceTasks(0);
-    boolean succeeded = job.waitForCompletion(true);
-    return succeeded ? 0 : -1;
-  }
-
-  public static class SplitMap extends Mapper<Text,Text,Text,Text> {
-
-    @Override
-    protected void map(Text key, Text text, Context context) throws IOException, InterruptedException {
-      Text outText = new Text();
-      int loc = 0;
-      while (loc >= 0 && loc < text.getLength()) {
-        int nextLoc = text.find("\n\n", loc + 1);
-        if (nextLoc > 0) {
-          outText.set(text.getBytes(), loc, nextLoc - loc);
-          context.write(key, outText);
-        }
-        loc = nextLoc;
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new TextParagraphSplittingJob(), args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
deleted file mode 100644
index b8441b7..0000000
--- a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
-
-/**
- * RecordReader used with the MultipleTextFileInputFormat class to read full files as
- * k/v pairs and groups of files as single input splits.
- */
-public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
-
-  private FileSplit fileSplit;
-  private boolean processed = false;
-  private Configuration configuration;
-  private BytesWritable value = new BytesWritable();
-  private IntWritable index;
-  private String fileFilterClassName = null;
-  private PathFilter pathFilter = null;
-
-  public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
-      throws IOException {
-    this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
-        fileSplit.getLength(idx), fileSplit.getLocations());
-    this.configuration = taskAttemptContext.getConfiguration();
-    this.index = new IntWritable(idx);
-    this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
-  }
-
-  @Override
-  public IntWritable getCurrentKey() {
-    return index;
-  }
-
-  @Override
-  public BytesWritable getCurrentValue() {
-    return value;
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return processed ? 1.0f : 0.0f;
-  }
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    if (!StringUtils.isBlank(fileFilterClassName) &&
-        !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
-      try {
-        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
-      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException {
-    if (!processed) {
-      byte[] contents = new byte[(int) fileSplit.getLength()];
-      Path file = fileSplit.getPath();
-      FileSystem fs = file.getFileSystem(this.configuration);
-
-      if (!fs.isFile(file)) {
-        return false;
-      }
-
-      FileStatus[] fileStatuses;
-      if (pathFilter != null) {
-        fileStatuses = fs.listStatus(file, pathFilter);
-      } else {
-        fileStatuses = fs.listStatus(file);
-      }
-
-      if (fileStatuses.length == 1) {
-        try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) {
-          IOUtils.readFully(in, contents, 0, contents.length);
-          value.setCapacity(contents.length);
-          value.set(contents, 0, contents.length);
-        }
-        processed = true;
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-}
\ No newline at end of file