You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:52:14 UTC

[46/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045 Delete directories which were moved/no longer in use

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
new file mode 100644
index 0000000..757f38c
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ClusterEvaluator {
+  
+  private static final Logger log = LoggerFactory.getLogger(ClusterEvaluator.class);
+  
+  private final Map<Integer,List<VectorWritable>> representativePoints;
+  
+  private final List<Cluster> clusters;
+  
+  private final DistanceMeasure measure;
+  
+  /**
+   * For testing only
+   * 
+   * @param representativePoints
+   *          a Map<Integer,List<VectorWritable>> of representative points keyed by clusterId
+   * @param clusters
+   *          a Map<Integer,Cluster> of the clusters keyed by clusterId
+   * @param measure
+   *          an appropriate DistanceMeasure
+   */
+  public ClusterEvaluator(Map<Integer,List<VectorWritable>> representativePoints, List<Cluster> clusters,
+      DistanceMeasure measure) {
+    this.representativePoints = representativePoints;
+    this.clusters = clusters;
+    this.measure = measure;
+  }
+  
+  /**
+   * Initialize a new instance from job information
+   * 
+   * @param conf
+   *          a Configuration with appropriate parameters
+   * @param clustersIn
+   *          a String path to the input clusters directory
+   */
+  public ClusterEvaluator(Configuration conf, Path clustersIn) {
+    measure = ClassUtils
+        .instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+    representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
+    clusters = loadClusters(conf, clustersIn);
+  }
+  
+  /**
+   * Load the clusters from their sequence files
+   * 
+   * @param clustersIn
+   *          a String pathname to the directory containing input cluster files
+   * @return a List<Cluster> of the clusters
+   */
+  private static List<Cluster> loadClusters(Configuration conf, Path clustersIn) {
+    List<Cluster> clusters = new ArrayList<>();
+    for (ClusterWritable clusterWritable : new SequenceFileDirValueIterable<ClusterWritable>(clustersIn, PathType.LIST,
+        PathFilters.logsCRCFilter(), conf)) {
+      Cluster cluster = clusterWritable.getValue();
+      clusters.add(cluster);
+    }
+    return clusters;
+  }
+  
+  /**
+   * Computes the inter-cluster density as defined in "Mahout In Action"
+   * 
+   * @return the interClusterDensity
+   */
+  public double interClusterDensity() {
+    double max = Double.NEGATIVE_INFINITY;
+    double min = Double.POSITIVE_INFINITY;
+    double sum = 0;
+    int count = 0;
+    Map<Integer,Vector> distances = interClusterDistances();
+    for (Vector row : distances.values()) {
+      for (Element element : row.nonZeroes()) {
+        double d = element.get();
+        min = Math.min(d, min);
+        max = Math.max(d, max);
+        sum += d;
+        count++;
+      }
+    }
+    double density = (sum / count - min) / (max - min);
+    log.info("Scaled Inter-Cluster Density = {}", density);
+    return density;
+  }
+  
+  /**
+   * Computes the inter-cluster distances
+   * 
+   * @return a Map<Integer, Vector>
+   */
+  public Map<Integer,Vector> interClusterDistances() {
+    Map<Integer,Vector> distances = new TreeMap<>();
+    for (int i = 0; i < clusters.size(); i++) {
+      Cluster clusterI = clusters.get(i);
+      RandomAccessSparseVector row = new RandomAccessSparseVector(Integer.MAX_VALUE);
+      distances.put(clusterI.getId(), row);
+      for (int j = i + 1; j < clusters.size(); j++) {
+        Cluster clusterJ = clusters.get(j);
+        double d = measure.distance(clusterI.getCenter(), clusterJ.getCenter());
+        row.set(clusterJ.getId(), d);
+      }
+    }
+    return distances;
+  }
+  
+  /**
+   * Computes the average intra-cluster density as the average of each cluster's intra-cluster density
+   * 
+   * @return the average intraClusterDensity
+   */
+  public double intraClusterDensity() {
+    double avgDensity = 0;
+    int count = 0;
+    for (Element elem : intraClusterDensities().nonZeroes()) {
+      double value = elem.get();
+      if (!Double.isNaN(value)) {
+        avgDensity += value;
+        count++;
+      }
+    }
+    avgDensity = clusters.isEmpty() ? 0 : avgDensity / count;
+    log.info("Average Intra-Cluster Density = {}", avgDensity);
+    return avgDensity;
+  }
+  
+  /**
+   * Computes the intra-cluster densities for all clusters as the average distance of the representative points from
+   * each other
+   * 
+   * @return a Vector of the intraClusterDensity of the representativePoints by clusterId
+   */
+  public Vector intraClusterDensities() {
+    Vector densities = new RandomAccessSparseVector(Integer.MAX_VALUE);
+    for (Cluster cluster : clusters) {
+      int count = 0;
+      double max = Double.NEGATIVE_INFINITY;
+      double min = Double.POSITIVE_INFINITY;
+      double sum = 0;
+      List<VectorWritable> repPoints = representativePoints.get(cluster.getId());
+      for (int i = 0; i < repPoints.size(); i++) {
+        for (int j = i + 1; j < repPoints.size(); j++) {
+          Vector v1 = repPoints.get(i).get();
+          Vector v2 = repPoints.get(j).get();
+          double d = measure.distance(v1, v2);
+          min = Math.min(d, min);
+          max = Math.max(d, max);
+          sum += d;
+          count++;
+        }
+      }
+      double density = (sum / count - min) / (max - min);
+      densities.set(cluster.getId(), density);
+      log.info("Intra-Cluster Density[{}] = {}", cluster.getId(), density);
+    }
+    return densities;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
new file mode 100644
index 0000000..2fe37ef
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RepresentativePointsDriver extends AbstractJob {
+  
+  public static final String STATE_IN_KEY = "org.apache.mahout.clustering.stateIn";
+  
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.measure";
+  
+  private static final Logger log = LoggerFactory.getLogger(RepresentativePointsDriver.class);
+  
+  private RepresentativePointsDriver() {}
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new RepresentativePointsDriver(), args);
+  }
+  
+  @Override
+  public int run(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
+    addInputOption();
+    addOutputOption();
+    addOption("clusteredPoints", "cp", "The path to the clustered points", true);
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
+    DistanceMeasure measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class);
+    Path clusteredPoints = new Path(getOption("clusteredPoints"));
+    run(getConf(), input, clusteredPoints, output, measure, maxIterations, runSequential);
+    return 0;
+  }
+  
+  /**
+   * Utility to print out representative points
+   * 
+   * @param output
+   *          the Path to the directory containing representativePoints-i folders
+   * @param numIterations
+   *          the int number of iterations to print
+   */
+  public static void printRepresentativePoints(Path output, int numIterations) {
+    for (int i = 0; i <= numIterations; i++) {
+      Path out = new Path(output, "representativePoints-" + i);
+      System.out.println("Representative Points for iteration " + i);
+      Configuration conf = new Configuration();
+      for (Pair<IntWritable,VectorWritable> record : new SequenceFileDirIterable<IntWritable,VectorWritable>(out,
+          PathType.LIST, PathFilters.logsCRCFilter(), null, true, conf)) {
+        System.out.println("\tC-" + record.getFirst().get() + ": "
+            + AbstractCluster.formatVector(record.getSecond().get(), null));
+      }
+    }
+  }
+  
+  public static void run(Configuration conf, Path clustersIn, Path clusteredPointsIn, Path output,
+      DistanceMeasure measure, int numIterations, boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Path stateIn = new Path(output, "representativePoints-0");
+    writeInitialState(stateIn, clustersIn);
+    
+    for (int iteration = 0; iteration < numIterations; iteration++) {
+      log.info("Representative Points Iteration {}", iteration);
+      // point the output to a new directory per iteration
+      Path stateOut = new Path(output, "representativePoints-" + (iteration + 1));
+      runIteration(conf, clusteredPointsIn, stateIn, stateOut, measure, runSequential);
+      // now point the input to the old output directory
+      stateIn = stateOut;
+    }
+    
+    conf.set(STATE_IN_KEY, stateIn.toString());
+    conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName());
+  }
+  
+  private static void writeInitialState(Path output, Path clustersIn) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    for (FileStatus dir : fs.globStatus(clustersIn)) {
+      Path inPath = dir.getPath();
+      for (FileStatus part : fs.listStatus(inPath, PathFilters.logsCRCFilter())) {
+        Path inPart = part.getPath();
+        Path path = new Path(output, inPart.getName());
+        try (SequenceFile.Writer writer =
+                 new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class)){
+          for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(inPart, true, conf)) {
+            Cluster cluster = clusterWritable.getValue();
+            if (log.isDebugEnabled()) {
+              log.debug("C-{}: {}", cluster.getId(), AbstractCluster.formatVector(cluster.getCenter(), null));
+            }
+            writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter()));
+          }
+        }
+      }
+    }
+  }
+  
+  private static void runIteration(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut,
+      DistanceMeasure measure, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+    if (runSequential) {
+      runIterationSeq(conf, clusteredPointsIn, stateIn, stateOut, measure);
+    } else {
+      runIterationMR(conf, clusteredPointsIn, stateIn, stateOut, measure);
+    }
+  }
+  
+  /**
+   * Run the job using supplied arguments as a sequential process
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param clusteredPointsIn
+   *          the directory pathname for input points
+   * @param stateIn
+   *          the directory pathname for input state
+   * @param stateOut
+   *          the directory pathname for output state
+   * @param measure
+   *          the DistanceMeasure to use
+   */
+  private static void runIterationSeq(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut,
+      DistanceMeasure measure) throws IOException {
+    
+    Map<Integer,List<VectorWritable>> repPoints = RepresentativePointsMapper.getRepresentativePoints(conf, stateIn);
+    Map<Integer,WeightedVectorWritable> mostDistantPoints = new HashMap<>();
+    FileSystem fs = FileSystem.get(clusteredPointsIn.toUri(), conf);
+    for (Pair<IntWritable,WeightedVectorWritable> record
+        : new SequenceFileDirIterable<IntWritable,WeightedVectorWritable>(clusteredPointsIn, PathType.LIST,
+            PathFilters.logsCRCFilter(), null, true, conf)) {
+      RepresentativePointsMapper.mapPoint(record.getFirst(), record.getSecond(), measure, repPoints, mostDistantPoints);
+    }
+    int part = 0;
+    try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++),
+        IntWritable.class, VectorWritable.class)){
+      for (Entry<Integer,List<VectorWritable>> entry : repPoints.entrySet()) {
+        for (VectorWritable vw : entry.getValue()) {
+          writer.append(new IntWritable(entry.getKey()), vw);
+        }
+      }
+    }
+    try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++),
+        IntWritable.class, VectorWritable.class)){
+      for (Map.Entry<Integer,WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
+        writer.append(new IntWritable(entry.getKey()), new VectorWritable(entry.getValue().getVector()));
+      }
+    }
+  }
+  
+  /**
+   * Run the job using supplied arguments as a Map/Reduce process
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the directory pathname for input points
+   * @param stateIn
+   *          the directory pathname for input state
+   * @param stateOut
+   *          the directory pathname for output state
+   * @param measure
+   *          the DistanceMeasure to use
+   */
+  private static void runIterationMR(Configuration conf, Path input, Path stateIn, Path stateOut,
+      DistanceMeasure measure) throws IOException, InterruptedException, ClassNotFoundException {
+    conf.set(STATE_IN_KEY, stateIn.toString());
+    conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName());
+    Job job = new Job(conf, "Representative Points Driver running over input: " + input);
+    job.setJarByClass(RepresentativePointsDriver.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(WeightedVectorWritable.class);
+    
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, stateOut);
+    
+    job.setMapperClass(RepresentativePointsMapper.class);
+    job.setReducerClass(RepresentativePointsReducer.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
new file mode 100644
index 0000000..0ae79ad
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+
+public class RepresentativePointsMapper
+  extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
+
+  private Map<Integer, List<VectorWritable>> representativePoints;
+  private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>();
+  private DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
+      context.write(new IntWritable(entry.getKey()), entry.getValue());
+    }
+    super.cleanup(context);
+  }
+
+  @Override
+  protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context)
+    throws IOException, InterruptedException {
+    mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints);
+  }
+
+  public static void mapPoint(IntWritable clusterId,
+                              WeightedVectorWritable point,
+                              DistanceMeasure measure,
+                              Map<Integer, List<VectorWritable>> representativePoints,
+                              Map<Integer, WeightedVectorWritable> mostDistantPoints) {
+    int key = clusterId.get();
+    WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
+
+    List<VectorWritable> repPoints = representativePoints.get(key);
+    double totalDistance = 0.0;
+    if (repPoints != null) {
+      for (VectorWritable refPoint : repPoints) {
+        totalDistance += measure.distance(refPoint.get(), point.getVector());
+      }
+    }
+    if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
+      mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone()));
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    measure =
+        ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+    representativePoints = getRepresentativePoints(conf);
+  }
+
+  public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) {
+    this.representativePoints = referencePoints;
+    this.measure = measure;
+  }
+
+  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
+    String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY);
+    return getRepresentativePoints(conf, new Path(statePath));
+  }
+
+  public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) {
+    Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>();
+    for (Pair<IntWritable,VectorWritable> record
+         : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath,
+                                                                   PathType.LIST,
+                                                                   PathFilters.logsCRCFilter(),
+                                                                   conf)) {
+      int keyValue = record.getFirst().get();
+      List<VectorWritable> repPoints = representativePoints.get(keyValue);
+      if (repPoints == null) {
+        repPoints = new ArrayList<>();
+        representativePoints.put(keyValue, repPoints);
+      }
+      repPoints.add(record.getSecond());
+    }
+    return representativePoints;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
new file mode 100644
index 0000000..27ca861
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.math.VectorWritable;
+
+public class RepresentativePointsReducer
+  extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+  private Map<Integer, List<VectorWritable>> representativePoints;
+
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) {
+      IntWritable iw = new IntWritable(entry.getKey());
+      for (VectorWritable vw : entry.getValue()) {
+        context.write(iw, vw);
+      }
+    }
+    super.cleanup(context);
+  }
+
+  @Override
+  protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context)
+    throws IOException, InterruptedException {
+    // find the most distant point
+    WeightedVectorWritable mdp = null;
+    for (WeightedVectorWritable dpw : values) {
+      if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
+        mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
+      }
+    }
+    context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector()));
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
+  }
+
+  public void configure(Map<Integer, List<VectorWritable>> representativePoints) {
+    this.representativePoints = representativePoints;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
new file mode 100644
index 0000000..392909e
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.lda;
+
+import com.google.common.io.Closeables;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.IntPairWritable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.utils.vectors.VectorHelper;
+
+/**
+ * Class to print out the top K words for each topic.
+ */
+public final class LDAPrintTopics {
+
+  private LDAPrintTopics() { }
+  
+  // Expands the queue list to have a Queue for topic K
+  private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) {
+    for (int i = queues.size(); i <= k; ++i) {
+      queues.add(new PriorityQueue<Pair<String,Double>>());
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    
+    Option inputOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument(
+      abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Dictionary to read in, in the same format as one created by "
+          + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create();
+    
+    Option outOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument(
+      abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription(
+      "Number of words to print").withShortName("w").create();
+    Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument(
+      abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription(
+      "The dictionary file type (text|sequencefile)").withShortName("dt").create();
+    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
+        .create();
+    
+    Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt)
+        .withOption(inputOpt).withOption(dictTypeOpt).create();
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      CommandLine cmdLine = parser.parse(args);
+      
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      
+      String input = cmdLine.getValue(inputOpt).toString();
+      String dictFile = cmdLine.getValue(dictOpt).toString();
+      int numWords = 20;
+      if (cmdLine.hasOption(wordOpt)) {
+        numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString());
+      }
+      Configuration config = new Configuration();
+      
+      String dictionaryType = "text";
+      if (cmdLine.hasOption(dictTypeOpt)) {
+        dictionaryType = cmdLine.getValue(dictTypeOpt).toString();
+      }
+      
+      List<String> wordList;
+      if ("text".equals(dictionaryType)) {
+        wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile)));
+      } else if ("sequencefile".equals(dictionaryType)) {
+        wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile));
+      } else {
+        throw new IllegalArgumentException("Invalid dictionary format");
+      }
+      
+      List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords);
+
+      File output = null;
+      if (cmdLine.hasOption(outOpt)) {
+        output = new File(cmdLine.getValue(outOpt).toString());
+        if (!output.exists() && !output.mkdirs()) {
+          throw new IOException("Could not create directory: " + output);
+        }
+      }
+      printTopWords(topWords, output);
+    } catch (OptionException e) {
+      CommandLineUtil.printHelp(group);
+      throw e;
+    }
+  }
+  
+  // Adds the word if the queue is below capacity, or the score is high enough
+  private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) {
+    if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) {
+      q.poll();
+    }
+    if (q.size() < numWordsToPrint) {
+      q.add(new Pair<>(word, score));
+    }
+  }
+  
+  private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir)
+    throws IOException {
+    for (int i = 0; i < topWords.size(); ++i) {
+      Collection<Pair<String,Double>> topK = topWords.get(i);
+      Writer out = null;
+      boolean printingToSystemOut = false;
+      try {
+        if (outputDir != null) {
+          out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8);
+        } else {
+          out = new OutputStreamWriter(System.out, Charsets.UTF_8);
+          printingToSystemOut = true;
+          out.write("Topic " + i);
+          out.write('\n');
+          out.write("===========");
+          out.write('\n');
+        }
+        List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size());
+        for (Pair<String,Double> wordWithScore : topK) {
+          topKasList.add(wordWithScore);
+        }
+        Collections.sort(topKasList, new Comparator<Pair<String,Double>>() {
+          @Override
+          public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) {
+            return pair2.getSecond().compareTo(pair1.getSecond());
+          }
+        });
+        for (Pair<String,Double> wordWithScore : topKasList) {
+          out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = "
+            + wordWithScore.getSecond());
+          out.write('\n');
+        }
+      } finally {
+        if (!printingToSystemOut) {
+          Closeables.close(out, false);
+        } else {
+          out.flush();
+        }
+      }
+    }
+  }
+  
+  private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir,
+                                                                    Configuration job,
+                                                                    List<String> wordList,
+                                                                    int numWordsToPrint) {
+    List<Queue<Pair<String,Double>>> queues = new ArrayList<>();
+    Map<Integer,Double> expSums = new HashMap<>();
+    for (Pair<IntPairWritable,DoubleWritable> record
+        : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>(
+            new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) {
+      IntPairWritable key = record.getFirst();
+      int topic = key.getFirst();
+      int word = key.getSecond();
+      ensureQueueSize(queues, topic);
+      if (word >= 0 && topic >= 0) {
+        double score = record.getSecond().get();
+        if (expSums.get(topic) == null) {
+          expSums.put(topic, 0.0);
+        }
+        expSums.put(topic, expSums.get(topic) + Math.exp(score));
+        String realWord = wordList.get(word);
+        maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint);
+      }
+    }
+    for (int i = 0; i < queues.size(); i++) {
+      Queue<Pair<String,Double>> queue = queues.get(i);
+      Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size());
+      double norm = expSums.get(i);
+      for (Pair<String,Double> pair : queue) {
+        newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm));
+      }
+      queues.set(i, newQueue);
+    }
+    return queues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
new file mode 100644
index 0000000..12ed471
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.lucene.analysis.TokenFilter;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.LowerCaseFilter;
+import org.apache.lucene.analysis.core.StopFilter;
+import org.apache.lucene.analysis.en.PorterStemFilter;
+import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
+import org.apache.lucene.analysis.standard.StandardFilter;
+import org.apache.lucene.analysis.standard.StandardTokenizer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.util.CharArraySet;
+import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Lucene Analyzer designed for aggressive feature reduction
+ * for clustering the ASF Mail Archives using an extended set of
+ * stop words, excluding non-alpha-numeric tokens, and porter stemming.
+ */
+public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase {
+  // extended set of stop words composed of common mail terms like "hi",
+  // HTML tags, and Java keywords asmany of the messages in the archives
+  // are subversion check-in notifications
+    
+  private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList(
+    "3d","7bit","a0","about","above","abstract","across","additional","after",
+    "afterwards","again","against","align","all","almost","alone","along",
+    "already","also","although","always","am","among","amongst","amoungst",
+    "amount","an","and","another","any","anybody","anyhow","anyone","anything",
+    "anyway","anywhere","are","arial","around","as","ascii","assert","at",
+    "back","background","base64","bcc","be","became","because","become","becomes",
+    "becoming","been","before","beforehand","behind","being","below","beside",
+    "besides","between","beyond","bgcolor","blank","blockquote","body","boolean",
+    "border","both","br","break","but","by","can","cannot","cant","case","catch",
+    "cc","cellpadding","cellspacing","center","char","charset","cheers","class",
+    "co","color","colspan","com","con","const","continue","could","couldnt",
+    "cry","css","de","dear","default","did","didnt","different","div","do",
+    "does","doesnt","done","dont","double","down","due","during","each","eg",
+    "eight","either","else","elsewhere","empty","encoding","enough","enum",
+    "etc","eu","even","ever","every","everyone","everything","everywhere",
+    "except","extends","face","family","few","ffffff","final","finally","float",
+    "font","for","former","formerly","fri","from","further","get","give","go",
+    "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head",
+    "height","hello","helvetica","hence","her","here","hereafter","hereby",
+    "herein","hereupon","hers","herself","hi","him","himself","his","how",
+    "however","hr","href","html","http","https","id","ie","if","ill","im",
+    "image","img","implements","import","in","inc","instanceof","int","interface",
+    "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep",
+    "last","latter","latterly","least","left","less","li","like","long","look",
+    "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message",
+    "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml",
+    "mso","much","must","my","myself","name","namely","native","nbsp","need",
+    "neither","never","nevertheless","new","next","nine","no","nobody","none",
+    "noone","nor","not","nothing","now","nowhere","null","of","off","often",
+    "ok","on","once","only","onto","or","org","other","others","otherwise",
+    "our","ours","ourselves","out","over","own","package","pad","per","perhaps",
+    "plain","please","pm","printable","private","protected","public","put",
+    "quot","quote","r1","r2","rather","re","really","regards","reply","return",
+    "right","said","same","sans","sat","say","saying","see","seem","seemed",
+    "seeming","seems","serif","serious","several","she","short","should","show",
+    "side","since","sincere","six","sixty","size","so","solid","some","somehow",
+    "someone","something","sometime","sometimes","somewhere","span","src",
+    "static","still","strictfp","string","strong","style","stylesheet","subject",
+    "such","sun","super","sure","switch","synchronized","table","take","target",
+    "td","text","th","than","thanks","that","the","their","them","themselves",
+    "then","thence","there","thereafter","thereby","therefore","therein","thereupon",
+    "these","they","thick","thin","think","third","this","those","though",
+    "three","through","throughout","throw","throws","thru","thu","thus","tm",
+    "to","together","too","top","toward","towards","tr","transfer","transient",
+    "try","tue","type","ul","un","under","unsubscribe","until","up","upon",
+    "us","use","used","uses","using","valign","verdana","very","via","void",
+    "volatile","want","was","we","wed","weight","well","were","what","whatever",
+    "when","whence","whenever","where","whereafter","whereas","whereby","wherein",
+    "whereupon","wherever","whether","which","while","whither","who","whoever",
+    "whole","whom","whose","why","width","will","with","within","without",
+    "wont","would","wrote","www","yes","yet","you","your","yours","yourself",
+    "yourselves"
+  ), false);
+
+  // Regex used to exclude non-alpha-numeric tokens
+  private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$");
+  private static final Matcher MATCHER = ALPHA_NUMERIC.matcher("");
+
+  public MailArchivesClusteringAnalyzer() {
+    super(STOP_SET);
+  }
+
+  public MailArchivesClusteringAnalyzer(CharArraySet stopSet) {
+    super(stopSet);
+  }
+  
+  @Override
+  protected TokenStreamComponents createComponents(String fieldName) {
+    Tokenizer tokenizer = new StandardTokenizer();
+    TokenStream result = new StandardFilter(tokenizer);
+    result = new LowerCaseFilter(result);
+    result = new ASCIIFoldingFilter(result);
+    result = new AlphaNumericMaxLengthFilter(result);
+    result = new StopFilter(result, STOP_SET);
+    result = new PorterStemFilter(result);
+    return new TokenStreamComponents(tokenizer, result);
+  }
+
+  /**
+   * Matches alpha-numeric tokens between 2 and 40 chars long.
+   */
+  static class AlphaNumericMaxLengthFilter extends TokenFilter {
+    private final CharTermAttribute termAtt;
+    private final char[] output = new char[28];
+
+    AlphaNumericMaxLengthFilter(TokenStream in) {
+      super(in);
+      termAtt = addAttribute(CharTermAttribute.class);
+    }
+
+    @Override
+    public final boolean incrementToken() throws IOException {
+      // return the first alpha-numeric token between 2 and 40 length
+      while (input.incrementToken()) {
+        int length = termAtt.length();
+        if (length >= 2 && length <= 28) {
+          char[] buf = termAtt.buffer();
+          int at = 0;
+          for (int c = 0; c < length; c++) {
+            char ch = buf[c];
+            if (ch != '\'') {
+              output[at++] = ch;
+            }
+          }
+          String term = new String(output, 0, at);
+          MATCHER.reset(term);
+          if (MATCHER.matches() && !term.startsWith("a0")) {
+            termAtt.setEmpty();
+            termAtt.append(term);
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
new file mode 100644
index 0000000..44df006
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * 
+ * Used in combining a large number of text files into one text input reader
+ * along with the WholeFileRecordReader class.
+ * 
+ */
+public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
+
+  @Override
+  public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
+                                                                      TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    return new CombineFileRecordReader<>((CombineFileSplit) inputSplit,
+        taskAttemptContext, WholeFileRecordReader.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
new file mode 100644
index 0000000..37ebc44
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.io.ChunkedWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Default parser for parsing text into sequence files.
+ */
+public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter {
+
+  public PrefixAdditionFilter(Configuration conf,
+                              String keyPrefix,
+                              Map<String, String> options, 
+                              ChunkedWriter writer,
+                              Charset charset,
+                              FileSystem fs) {
+    super(conf, keyPrefix, options, writer, charset, fs);
+  }
+
+  @Override
+  protected void process(FileStatus fst, Path current) throws IOException {
+    FileSystem fs = getFs();
+    ChunkedWriter writer = getWriter();
+    if (fst.isDir()) {
+      String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName();
+      fs.listStatus(fst.getPath(),
+                    new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs));
+    } else {
+      try (InputStream in = fs.open(fst.getPath())){
+        StringBuilder file = new StringBuilder();
+        for (String aFit : new FileLineIterable(in, getCharset(), false)) {
+          file.append(aFit).append('\n');
+        }
+        String name = current.getName().equals(fst.getPath().getName())
+            ? current.getName()
+            : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+        writer.write(getPrefix() + Path.SEPARATOR + name, file.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
new file mode 100644
index 0000000..311ab8d
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.utils.io.ChunkedWriter;
+
+/**
+ * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a
+ * parent directory containing sub folders of text documents and recursively reads the files and creates the
+ * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the
+ * document from the parent directory prepended with a specified prefix. You can also specify the input encoding
+ * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text.
+ */
+public class SequenceFilesFromDirectory extends AbstractJob {
+
+  private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
+
+  private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+  public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
+  private static final String[] CHARSET_OPTION = {"charset", "c"};
+
+  private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
+  public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+  public static final String BASE_INPUT_PATH = "baseinputpath";
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SequenceFilesFromDirectory(), args);
+  }
+
+  /*
+  * callback main after processing MapReduce parameters
+  */
+  @Override
+  public int run(String[] args) throws Exception {
+    addOptions();
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Map<String, String> options = parseOptions();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+
+    if (getOption(DefaultOptionCreator.METHOD_OPTION,
+      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+      runSequential(getConf(), getInputPath(), output, options);
+    } else {
+      runMapReduce(getInputPath(), output);
+    }
+
+    return 0;
+  }
+
+  private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
+    throws IOException, InterruptedException, NoSuchMethodException {
+    // Running sequentially
+    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+    String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+
+    try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) {
+      SequenceFilesFromDirectoryFilter pathFilter;
+      String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
+      if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+        pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
+      } else {
+        pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
+          new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+          new Object[] {conf, keyPrefix, options, writer, charset, fs});
+      }
+      fs.listStatus(input, pathFilter);
+    }
+    return 0;
+  }
+
+  private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
+
+    int chunkSizeInMB = 64;
+    if (hasOption(CHUNK_SIZE_OPTION[0])) {
+      chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+    }
+
+    String keyPrefix = null;
+    if (hasOption(KEY_PREFIX_OPTION[0])) {
+      keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+    }
+
+    String fileFilterClassName = null;
+    if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
+      fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+    }
+
+    PathFilter pathFilter = null;
+    // Prefix Addition is presently handled in the Mapper and unlike runsequential()
+    // need not be done via a pathFilter
+    if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+      try {
+        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    // Prepare Job for submission.
+    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
+      SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
+      SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
+
+    Configuration jobConfig = job.getConfiguration();
+    jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
+    jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
+
+    FileSystem fs = FileSystem.get(jobConfig);
+    FileStatus fsFileStatus = fs.getFileStatus(input);
+
+    String inputDirList;
+    if (pathFilter != null) {
+      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
+    } else {
+      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+    }
+
+    jobConfig.set(BASE_INPUT_PATH, input.toString());
+
+    long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+
+    // set the max split locations, otherwise we get nasty debug stuff
+    jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
+
+    FileInputFormat.setInputPaths(job, inputDirList);
+    // need to set this to a multiple of the block size, or no split happens
+    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+    FileOutputFormat.setCompressOutput(job, true);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+    return 0;
+  }
+
+  /**
+   * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
+   * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
+   */
+  protected void addOptions() {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+    addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+      "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+      "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+  }
+
+  /**
+   * Override this method in order to parse your additional options from the command line. Do not forget to call
+   * super() otherwise standard options (input/output dirs etc) will not be available.
+   *
+   * @return Map of options
+   */
+  protected Map<String, String> parseOptions() {
+    Map<String, String> options = new HashMap<>();
+    options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
+    options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0]));
+    options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
new file mode 100644
index 0000000..6e4bd64
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.mahout.utils.io.ChunkedWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic.
+ */
+public abstract class SequenceFilesFromDirectoryFilter implements PathFilter {
+  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
+
+  private final String prefix;
+  private final ChunkedWriter writer;
+  private final Charset charset;
+  private final FileSystem fs;
+  private final Map<String, String> options;
+  private final Configuration conf;
+
+  protected SequenceFilesFromDirectoryFilter(Configuration conf,
+                                             String keyPrefix,
+                                             Map<String, String> options,
+                                             ChunkedWriter writer,
+                                             Charset charset,
+                                             FileSystem fs) {
+    this.prefix = keyPrefix;
+    this.writer = writer;
+    this.charset = charset;
+    this.fs = fs;
+    this.options = options;
+    this.conf = conf;
+  }
+
+  protected final String getPrefix() {
+    return prefix;
+  }
+
+  protected final ChunkedWriter getWriter() {
+    return writer;
+  }
+
+  protected final Charset getCharset() {
+    return charset;
+  }
+
+  protected final FileSystem getFs() {
+    return fs;
+  }
+
+  protected final Map<String, String> getOptions() {
+    return options;
+  }
+  
+  protected final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final boolean accept(Path current) {
+    log.debug("CURRENT: {}", current.getName());
+    try {
+      for (FileStatus fst : fs.listStatus(current)) {
+        log.debug("CHILD: {}", fst.getPath().getName());
+        process(fst, current);
+      }
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+    return false;
+  }
+
+  protected abstract void process(FileStatus in, Path current) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
new file mode 100644
index 0000000..40df3c2
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
+
+import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION;
+
+/**
+ * Map class for SequenceFilesFromDirectory MR job
+ */
+public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+  private String keyPrefix;
+  private Text fileValue = new Text();
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], "");
+  }
+
+  public void map(IntWritable key, BytesWritable value, Context context)
+    throws IOException, InterruptedException {
+
+    Configuration configuration = context.getConfiguration();
+    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
+
+    String filename = this.keyPrefix.length() > 0 ?
+      this.keyPrefix + Path.SEPARATOR + relativeFilePath :
+      Path.SEPARATOR + relativeFilePath;
+
+    fileValue.set(value.getBytes(), 0, value.getBytes().length);
+    context.write(new Text(filename), fileValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
new file mode 100644
index 0000000..c17cc12
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.commons.io.DirectoryWalker;
+import org.apache.commons.io.comparator.CompositeFileComparator;
+import org.apache.commons.io.comparator.DirectoryFileComparator;
+import org.apache.commons.io.comparator.PathFileComparator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+import org.apache.mahout.utils.io.ChunkedWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Converts a directory of gzipped mail archives into SequenceFiles of specified
+ * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except
+ * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and
+ * body text of each mail message into a separate key/value pair.
+ */
+public final class SequenceFilesFromMailArchives extends AbstractJob {
+
+  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
+
+  public static final String[] CHUNK_SIZE_OPTION     = {"chunkSize", "chunk"};
+  public static final String[] KEY_PREFIX_OPTION     = {"keyPrefix", "prefix"};
+  public static final String[] CHARSET_OPTION        = {"charset", "c"};
+  public static final String[] SUBJECT_OPTION        = {"subject", "s"};
+  public static final String[] TO_OPTION             = {"to", "to"};
+  public static final String[] FROM_OPTION           = {"from", "from"};
+  public static final String[] REFERENCES_OPTION     = {"references", "refs"};
+  public static final String[] BODY_OPTION           = {"body", "b"};
+  public static final String[] STRIP_QUOTED_OPTION   = {"stripQuoted", "q"};
+  public static final String[] QUOTED_REGEX_OPTION   = {"quotedRegex", "regex"};
+  public static final String[] SEPARATOR_OPTION      = {"separator", "sep"};
+  public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"};
+  public static final String BASE_INPUT_PATH         = "baseinputpath";
+
+  private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
+  public void createSequenceFiles(MailOptions options) throws IOException {
+    try (ChunkedWriter writer =
+             new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){
+      MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
+      if (options.getInput().isDirectory()) {
+        PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer);
+        walker.walk(options.getInput());
+        log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath());
+      } else {
+        long start = System.currentTimeMillis();
+        long cnt = processor.parseMboxLineByLine(options.getInput());
+        long finish = System.currentTimeMillis();
+        log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start);
+      }
+    }
+  }
+
+  private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> {
+
+    @SuppressWarnings("unchecked")
+    private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator(
+        DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR);
+
+    private final Deque<MailProcessor> processors = new ArrayDeque<>();
+    private final ChunkedWriter writer;
+    private final Deque<Long> messageCounts = new ArrayDeque<>();
+
+    public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) {
+      processors.addFirst(processor);
+      this.writer = writer;
+      messageCounts.addFirst(0L);
+    }
+
+    public void walk(File startDirectory) throws IOException {
+      super.walk(startDirectory, null);
+    }
+
+    public long getMessageCount() {
+      return messageCounts.getFirst();
+    }
+
+    @Override
+    protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException {
+      if (depth > 0) {
+        log.info("At {}", current.getAbsolutePath());
+        MailProcessor processor = processors.getFirst();
+        MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix()
+            + File.separator + current.getName(), writer);
+        processors.push(subDirProcessor);
+        messageCounts.push(0L);
+      }
+    }
+
+    @Override
+    protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException {
+      Arrays.sort(files, FILE_COMPARATOR);
+      return files;
+    }
+
+    @Override
+    protected void handleFile(File current, int depth, Collection<Object> results) throws IOException {
+      MailProcessor processor = processors.getFirst();
+      long currentDirMessageCount = messageCounts.pop();
+      try {
+        currentDirMessageCount += processor.parseMboxLineByLine(current);
+      } catch (IOException e) {
+        throw new IllegalStateException("Error processing " + current, e);
+      }
+      messageCounts.push(currentDirMessageCount);
+    }
+
+    @Override
+    protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException {
+      if (depth > 0) {
+        final long currentDirMessageCount = messageCounts.pop();
+        log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath());
+
+        processors.pop();
+
+        // aggregate message counts
+        long parentDirMessageCount = messageCounts.pop();
+        parentDirMessageCount += currentDirMessageCount;
+        messageCounts.push(parentDirMessageCount);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.methodOption().create());
+
+    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+      "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+    addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text.  Default is false");
+    addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text.  Default is false");
+    addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text.  Default is false");
+    addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1],
+      "Include the references field in the text.  Default is false");
+    addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output.  Default is false");
+    addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1],
+      "Strip (remove) quoted email text in the body.  Default is false");
+    addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1],
+        "Specify the regex that identifies quoted text.  "
+          + "Default is to look for > or | at the beginning of the line.");
+    addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1],
+        "The separator to use between metadata items (to, from, etc.).  Default is \\n", "\n");
+    addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1],
+        "The separator to use between lines in the body.  Default is \\n.  "
+          + "Useful to change if you wish to have the message be on one line", "\n");
+
+    addOption(DefaultOptionCreator.helpOption());
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+    File input = getInputFile();
+    String outputDir = getOutputPath().toString();
+
+    int chunkSize = 64;
+    if (hasOption(CHUNK_SIZE_OPTION[0])) {
+      chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+    }
+
+    String prefix = "";
+    if (hasOption(KEY_PREFIX_OPTION[0])) {
+      prefix = getOption(KEY_PREFIX_OPTION[0]);
+    }
+
+    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+    MailOptions options = new MailOptions();
+    options.setInput(input);
+    options.setOutputDir(outputDir);
+    options.setPrefix(prefix);
+    options.setChunkSize(chunkSize);
+    options.setCharset(charset);
+
+    List<Pattern> patterns = new ArrayList<>(5);
+    // patternOrder is used downstream so that we can know what order the text
+    // is in instead of encoding it in the string, which
+    // would require more processing later to remove it pre feature selection.
+    Map<String, Integer> patternOrder = new HashMap<>();
+    int order = 0;
+    if (hasOption(FROM_OPTION[0])) {
+      patterns.add(MailProcessor.FROM_PREFIX);
+      patternOrder.put(MailOptions.FROM, order++);
+    }
+    if (hasOption(TO_OPTION[0])) {
+      patterns.add(MailProcessor.TO_PREFIX);
+      patternOrder.put(MailOptions.TO, order++);
+    }
+    if (hasOption(REFERENCES_OPTION[0])) {
+      patterns.add(MailProcessor.REFS_PREFIX);
+      patternOrder.put(MailOptions.REFS, order++);
+    }
+    if (hasOption(SUBJECT_OPTION[0])) {
+      patterns.add(MailProcessor.SUBJECT_PREFIX);
+      patternOrder.put(MailOptions.SUBJECT, order += 1);
+    }
+    options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0]));
+
+    options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+    options.setPatternOrder(patternOrder);
+    options.setIncludeBody(hasOption(BODY_OPTION[0]));
+
+    if (hasOption(SEPARATOR_OPTION[0])) {
+      options.setSeparator(getOption(SEPARATOR_OPTION[0]));
+    } else {
+      options.setSeparator("\n");
+    }
+
+    if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+      options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0]));
+    }
+
+    if (hasOption(QUOTED_REGEX_OPTION[0])) {
+      options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])));
+    }
+
+    if (getOption(DefaultOptionCreator.METHOD_OPTION,
+      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+      runSequential(options);
+    } else {
+      runMapReduce(getInputPath(), getOutputPath());
+    }
+
+    return 0;
+  }
+
+  private int runSequential(MailOptions options)
+    throws IOException, InterruptedException, NoSuchMethodException {
+
+    long start = System.currentTimeMillis();
+    createSequenceFiles(options);
+    long finish = System.currentTimeMillis();
+    log.info("Conversion took {}ms", finish - start);
+
+    return 0;
+  }
+
+  private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
+      Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
+
+    Configuration jobConfig = job.getConfiguration();
+
+    if (hasOption(KEY_PREFIX_OPTION[0])) {
+      jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0]));
+    }
+
+    int chunkSize = 0;
+    if (hasOption(CHUNK_SIZE_OPTION[0])) {
+      chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+      jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize));
+    }
+
+    Charset charset;
+    if (hasOption(CHARSET_OPTION[0])) {
+      charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+      jobConfig.set(CHARSET_OPTION[0], charset.displayName());
+    }
+
+    if (hasOption(FROM_OPTION[0])) {
+      jobConfig.set(FROM_OPTION[1], "true");
+    }
+
+    if (hasOption(TO_OPTION[0])) {
+      jobConfig.set(TO_OPTION[1], "true");
+    }
+
+    if (hasOption(REFERENCES_OPTION[0])) {
+      jobConfig.set(REFERENCES_OPTION[1], "true");
+    }
+
+    if (hasOption(SUBJECT_OPTION[0])) {
+      jobConfig.set(SUBJECT_OPTION[1], "true");
+    }
+
+    if (hasOption(QUOTED_REGEX_OPTION[0])) {
+      jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString());
+    }
+
+    if (hasOption(SEPARATOR_OPTION[0])) {
+      jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0]));
+    } else {
+      jobConfig.set(SEPARATOR_OPTION[1], "\n");
+    }
+
+    if (hasOption(BODY_OPTION[0])) {
+      jobConfig.set(BODY_OPTION[1], "true");
+    } else {
+      jobConfig.set(BODY_OPTION[1], "false");
+    }
+
+    if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+      jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0]));
+    } else {
+      jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n");
+    }
+
+    FileSystem fs = FileSystem.get(jobConfig);
+    FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+
+    jobConfig.set(BASE_INPUT_PATH, inputPath.toString());
+    String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+    FileInputFormat.setInputPaths(job, inputDirList);
+
+    long chunkSizeInBytes = chunkSize * 1024 * 1024;
+    // need to set this to a multiple of the block size, or no split happens
+    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+
+    // set the max split locations, otherwise we get nasty debug stuff
+    jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+    return 0;
+  }
+}