You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:56 UTC

[25/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
new file mode 100644
index 0000000..4bffb2b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.mapreduce;
+
+import java.io.IOException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.FastProjectionSearch;
+import org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch;
+import org.apache.mahout.math.neighborhood.ProjectionSearch;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+
+public final class StreamingKMeansUtilsMR {
+
+  private StreamingKMeansUtilsMR() {
+  }
+
+  /**
+   * Instantiates a searcher from a given configuration.
+   * @param conf the configuration
+   * @return the instantiated searcher
+   * @throws RuntimeException if the distance measure class cannot be instantiated
+   * @throws IllegalStateException if an unknown searcher class was requested
+   */
+  public static UpdatableSearcher searcherFromConfiguration(Configuration conf) {
+    DistanceMeasure distanceMeasure;
+    String distanceMeasureClass = conf.get(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    try {
+      distanceMeasure = (DistanceMeasure) Class.forName(distanceMeasureClass).getConstructor().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to instantiate distanceMeasure", e);
+    }
+
+    int numProjections =  conf.getInt(StreamingKMeansDriver.NUM_PROJECTIONS_OPTION, 20);
+    int searchSize =  conf.getInt(StreamingKMeansDriver.SEARCH_SIZE_OPTION, 10);
+
+    String searcherClass = conf.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION);
+
+    if (searcherClass.equals(BruteSearch.class.getName())) {
+      return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class,
+          new Class[]{DistanceMeasure.class}, new Object[]{distanceMeasure});
+    } else if (searcherClass.equals(FastProjectionSearch.class.getName())
+        || searcherClass.equals(ProjectionSearch.class.getName())) {
+      return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class,
+          new Class[]{DistanceMeasure.class, int.class, int.class},
+          new Object[]{distanceMeasure, numProjections, searchSize});
+    } else if (searcherClass.equals(LocalitySensitiveHashSearch.class.getName())) {
+      return ClassUtils.instantiateAs(searcherClass, LocalitySensitiveHashSearch.class,
+          new Class[]{DistanceMeasure.class, int.class},
+          new Object[]{distanceMeasure, searchSize});
+    } else {
+      throw new IllegalStateException("Unknown class instantiation requested");
+    }
+  }
+
+  /**
+   * Returns an Iterable of centroids from an Iterable of VectorWritables by creating a new Centroid containing
+   * a RandomAccessSparseVector as a delegate for each VectorWritable.
+   * @param inputIterable VectorWritable Iterable to get Centroids from
+   * @return the new Centroids
+   */
+  public static Iterable<Centroid> getCentroidsFromVectorWritable(Iterable<VectorWritable> inputIterable) {
+    return Iterables.transform(inputIterable, new Function<VectorWritable, Centroid>() {
+      private int numVectors = 0;
+      @Override
+      public Centroid apply(VectorWritable input) {
+        Preconditions.checkNotNull(input);
+        return new Centroid(numVectors++, new RandomAccessSparseVector(input.get()), 1);
+      }
+    });
+  }
+
+  /**
+   * Returns an Iterable of Centroid from an Iterable of Vector by either casting each Vector to Centroid (if the
+   * instance extends Centroid) or create a new Centroid based on that Vector.
+   * The implicit expectation is that the input will not have interleaving types of vectors. Otherwise, the numbering
+   * of new Centroids will become invalid.
+   * @param input Iterable of Vectors to cast
+   * @return the new Centroids
+   */
+  public static Iterable<Centroid> castVectorsToCentroids(Iterable<Vector> input) {
+    return Iterables.transform(input, new Function<Vector, Centroid>() {
+      private int numVectors = 0;
+      @Override
+      public Centroid apply(Vector input) {
+        Preconditions.checkNotNull(input);
+        if (input instanceof Centroid) {
+          return (Centroid) input;
+        } else {
+          return new Centroid(numVectors++, input, 1);
+        }
+      }
+    });
+  }
+
+  /**
+   * Writes centroids to a sequence file.
+   * @param centroids the centroids to write.
+   * @param path the path of the output file.
+   * @param conf the configuration for the HDFS to write the file to.
+   * @throws java.io.IOException
+   */
+  public static void writeCentroidsToSequenceFile(Iterable<Centroid> centroids, Path path, Configuration conf)
+    throws IOException {
+    SequenceFile.Writer writer = null;
+    try {
+      writer = SequenceFile.createWriter(FileSystem.get(conf), conf,
+          path, IntWritable.class, CentroidWritable.class);
+      int i = 0;
+      for (Centroid centroid : centroids) {
+        writer.append(new IntWritable(i++), new CentroidWritable(centroid));
+      }
+    } finally {
+      Closeables.close(writer, true);
+    }
+  }
+
+  public static void writeVectorsToSequenceFile(Iterable<? extends Vector> datapoints, Path path, Configuration conf)
+    throws IOException {
+    SequenceFile.Writer writer = null;
+    try {
+      writer = SequenceFile.createWriter(FileSystem.get(conf), conf,
+          path, IntWritable.class, VectorWritable.class);
+      int i = 0;
+      for (Vector vector : datapoints) {
+        writer.append(new IntWritable(i++), new VectorWritable(vector));
+      }
+    } finally {
+      Closeables.close(writer, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
new file mode 100644
index 0000000..55b7848
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.tools;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+
+public class ResplitSequenceFiles {
+
+  private String inputFile;
+  private String outputFileBase;
+  private int numSplits;
+
+  private Configuration conf;
+  private FileSystem fs;
+
+  private ResplitSequenceFiles() {}
+
+  private void writeSplit(Iterator<Pair<Writable, Writable>> inputIterator,
+                          int numSplit, int numEntriesPerSplit) throws IOException {
+    SequenceFile.Writer splitWriter = null;
+    for (int j = 0; j < numEntriesPerSplit; ++j) {
+      Pair<Writable, Writable> item = inputIterator.next();
+      if (splitWriter == null) {
+        splitWriter = SequenceFile.createWriter(fs, conf,
+            new Path(outputFileBase + "-" + numSplit), item.getFirst().getClass(), item.getSecond().getClass());
+      }
+      splitWriter.append(item.getFirst(), item.getSecond());
+    }
+    if (splitWriter != null) {
+      splitWriter.close();
+    }
+  }
+
+  private void run(PrintWriter printWriter) throws IOException {
+    conf = new Configuration();
+    SequenceFileDirIterable<Writable, Writable> inputIterable = new
+        SequenceFileDirIterable<Writable, Writable>(new Path(inputFile), PathType.LIST, conf);
+    fs = FileSystem.get(conf);
+
+    int numEntries = Iterables.size(inputIterable);
+    int numEntriesPerSplit = numEntries / numSplits;
+    int numEntriesLastSplit = numEntriesPerSplit + numEntries - numEntriesPerSplit * numSplits;
+    Iterator<Pair<Writable, Writable>> inputIterator = inputIterable.iterator();
+
+    printWriter.printf("Writing %d splits\n", numSplits);
+    for (int i = 0; i < numSplits - 1; ++i) {
+      printWriter.printf("Writing split %d\n", i);
+      writeSplit(inputIterator, i, numEntriesPerSplit);
+    }
+    printWriter.printf("Writing split %d\n", numSplits - 1);
+    writeSplit(inputIterator, numSplits - 1, numEntriesLastSplit);
+  }
+
+  private boolean parseArgs(String[] args) {
+    DefaultOptionBuilder builder = new DefaultOptionBuilder();
+
+    Option help = builder.withLongName("help").withDescription("print this list").create();
+
+    ArgumentBuilder argumentBuilder = new ArgumentBuilder();
+    Option inputFileOption = builder.withLongName("input")
+        .withShortName("i")
+        .withRequired(true)
+        .withArgument(argumentBuilder.withName("input").withMaximum(1).create())
+        .withDescription("what the base folder for sequence files is (they all must have the same key/value type")
+        .create();
+
+    Option outputFileOption = builder.withLongName("output")
+        .withShortName("o")
+        .withRequired(true)
+        .withArgument(argumentBuilder.withName("output").withMaximum(1).create())
+        .withDescription("the base name of the file split that the files will be split it; the i'th split has the "
+            + "suffix -i")
+        .create();
+
+    Option numSplitsOption = builder.withLongName("numSplits")
+        .withShortName("ns")
+        .withRequired(true)
+        .withArgument(argumentBuilder.withName("numSplits").withMaximum(1).create())
+        .withDescription("how many splits to use for the given files")
+        .create();
+
+    Group normalArgs = new GroupBuilder()
+        .withOption(help)
+        .withOption(inputFileOption)
+        .withOption(outputFileOption)
+        .withOption(numSplitsOption)
+        .create();
+
+    Parser parser = new Parser();
+    parser.setHelpOption(help);
+    parser.setHelpTrigger("--help");
+    parser.setGroup(normalArgs);
+    parser.setHelpFormatter(new HelpFormatter(" ", "", " ", 130));
+    CommandLine cmdLine = parser.parseAndHelp(args);
+
+    if (cmdLine == null) {
+      return false;
+    }
+
+    inputFile = (String) cmdLine.getValue(inputFileOption);
+    outputFileBase = (String) cmdLine.getValue(outputFileOption);
+    numSplits = Integer.parseInt((String) cmdLine.getValue(numSplitsOption));
+    return true;
+  }
+
+  public static void main(String[] args) throws IOException {
+    ResplitSequenceFiles runner = new ResplitSequenceFiles();
+    if (runner.parseArgs(args)) {
+      runner.run(new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
new file mode 100644
index 0000000..11bc34a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Contains list of all internal paths used in top down clustering.
+ */
+public final class PathDirectory {
+
+  public static final String TOP_LEVEL_CLUSTER_DIRECTORY = "topLevelCluster";
+  public static final String POST_PROCESS_DIRECTORY = "clusterPostProcessed";
+  public static final String CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
+  public static final String BOTTOM_LEVEL_CLUSTER_DIRECTORY = "bottomLevelCluster";
+
+  private PathDirectory() {
+  }
+
+  /**
+   * All output of top level clustering is stored in output directory/topLevelCluster.
+   * 
+   * @param output
+   *          the output path of clustering.
+   * @return The top level Cluster Directory.
+   */
+  public static Path getTopLevelClusterPath(Path output) {
+    return new Path(output + File.separator + TOP_LEVEL_CLUSTER_DIRECTORY);
+  }
+  
+  /**
+   * The output of top level clusters is post processed and kept in this path.
+   * 
+   * @param outputPathProvidedByUser
+   *          the output path of clustering.
+   * @return the path where the output of top level cluster post processor is kept.
+   */
+  public static Path getClusterPostProcessorOutputDirectory(Path outputPathProvidedByUser) {
+    return new Path(outputPathProvidedByUser + File.separator + POST_PROCESS_DIRECTORY);
+  }
+  
+  /**
+   * The top level clustered points before post processing is generated here.
+   * 
+   * @param output
+   *          the output path of clustering.
+   * @return the clustered points directory
+   */
+  public static Path getClusterOutputClusteredPoints(Path output) {
+    return new Path(output + File.separator + CLUSTERED_POINTS_DIRECTORY + File.separator, "*");
+  }
+  
+  /**
+   * Each cluster produced by top level clustering is processed in output/"bottomLevelCluster"/clusterId.
+   * 
+   * @param output
+   * @param clusterId
+   * @return the bottom level clustering path.
+   */
+  public static Path getBottomLevelClusterPath(Path output, String clusterId) {
+    return new Path(output + File.separator + BOTTOM_LEVEL_CLUSTER_DIRECTORY + File.separator + clusterId);
+  }
+  
+  /**
+   * Each clusters path name is its clusterId. The vectors reside in separate files inside it.
+   * 
+   * @param clusterPostProcessorOutput
+   *          the path of cluster post processor output.
+   * @param clusterId
+   *          the id of the cluster.
+   * @return the cluster path for cluster id.
+   */
+  public static Path getClusterPathForClusterId(Path clusterPostProcessorOutput, String clusterId) {
+    return new Path(clusterPostProcessorOutput + File.separator + clusterId);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
new file mode 100644
index 0000000..083b543
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Reads the number of clusters produced by the clustering algorithm.
+ */
+public final class ClusterCountReader {
+
+  private ClusterCountReader() {
+  }
+
+  /**
+   * Reads the number of clusters present by reading the clusters-*-final file.
+   *
+   * @param clusterOutputPath The output path provided to the clustering algorithm.
+   * @param conf              The hadoop configuration.
+   * @return the number of final clusters.
+   */
+  public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException {
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    int numberOfClusters = 0;
+    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(clusterFiles[0].getPath(),
+            PathType.LIST,
+            PathFilters.partFilter(),
+            null,
+            true,
+            conf);
+    while (it.hasNext()) {
+      it.next();
+      numberOfClusters++;
+    }
+    return numberOfClusters;
+  }
+
+  /**
+   * Generates a list of all cluster ids by reading the clusters-*-final file.
+   *
+   * @param clusterOutputPath The output path provided to the clustering algorithm.
+   * @param conf              The hadoop configuration.
+   * @return An ArrayList containing the final cluster ids.
+   */
+  public static Map<Integer, Integer> getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId)
+    throws IOException {
+    Map<Integer, Integer> clusterIds = new HashMap<>();
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    //System.out.println("LOOK HERE: " + clusterOutputPath);
+    Iterator<ClusterWritable> it = new SequenceFileDirValueIterator<>(clusterFiles[0].getPath(),
+            PathType.LIST,
+            PathFilters.partFilter(),
+            null,
+            true,
+            conf);
+    int i = 0;
+    while (it.hasNext()) {
+      Integer key;
+      Integer value;
+      if (keyIsClusterId) { // key is the cluster id, value is i, the index we will use
+        key = it.next().getValue().getId();
+        value = i;
+      } else {
+        key = i;
+        value = it.next().getValue().getId();
+      }
+      clusterIds.put(key, value);
+      i++;
+    }
+    return clusterIds;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
new file mode 100644
index 0000000..44a944d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class reads the output of any clustering algorithm, and, creates separate directories for different
+ * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster
+ * directory associated with that point.
+ * <p/>
+ * This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered
+ * sequentially.
+ * <p/>
+ * The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}.
+ */
+public final class ClusterOutputPostProcessor {
+
+  private Path clusteredPoints;
+  private final FileSystem fileSystem;
+  private final Configuration conf;
+  private final Path clusterPostProcessorOutput;
+  private final Map<String, Path> postProcessedClusterDirectories = Maps.newHashMap();
+  private long uniqueVectorId = 0L;
+  private final Map<String, SequenceFile.Writer> writersForClusters;
+
+  public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed,
+                                    Path output,
+                                    Configuration hadoopConfiguration) throws IOException {
+    this.clusterPostProcessorOutput = output;
+    this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed);
+    this.conf = hadoopConfiguration;
+    this.writersForClusters = Maps.newHashMap();
+    fileSystem = clusteredPoints.getFileSystem(conf);
+  }
+
+  /**
+   * This method takes the clustered points output by the clustering algorithms as input and writes them into
+   * their respective clusters.
+   */
+  public void process() throws IOException {
+    createPostProcessDirectory();
+    for (Pair<?, WeightedVectorWritable> record
+        : new SequenceFileDirIterable<Writable, WeightedVectorWritable>(clusteredPoints, PathType.GLOB, PathFilters.partFilter(),
+                                                                        null, false, conf)) {
+      String clusterId = record.getFirst().toString().trim();
+      putVectorInRespectiveCluster(clusterId, record.getSecond());
+    }
+    IOUtils.close(writersForClusters.values());
+    writersForClusters.clear();
+  }
+
+  /**
+   * Creates the directory to put post processed clusters.
+   */
+  private void createPostProcessDirectory() throws IOException {
+    if (!fileSystem.exists(clusterPostProcessorOutput)
+            && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
+      throw new IOException("Error creating cluster post processor directory");
+    }
+  }
+
+  /**
+   * Finds out the cluster directory of the vector and writes it into the specified cluster.
+   */
+  private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException {
+    Writer writer = findWriterForVector(clusterId);
+    postProcessedClusterDirectories.put(clusterId,
+            PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
+    writeVectorToCluster(writer, point);
+  }
+
+  /**
+   * Finds out the path in cluster where the point is supposed to be written.
+   */
+  private Writer findWriterForVector(String clusterId) throws IOException {
+    Path clusterDirectory = PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId);
+    Writer writer = writersForClusters.get(clusterId);
+    if (writer == null) {
+      Path pathToWrite = new Path(clusterDirectory, new Path("part-m-0"));
+      writer = new Writer(fileSystem, conf, pathToWrite, LongWritable.class, VectorWritable.class);
+      writersForClusters.put(clusterId, writer);
+    }
+    return writer;
+  }
+
+  /**
+   * Writes vector to the cluster directory.
+   */
+  private void writeVectorToCluster(Writer writer, WeightedVectorWritable point) throws IOException {
+    writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector()));
+    writer.sync();
+  }
+
+  /**
+   * @return the set of all post processed cluster paths.
+   */
+  public Map<String, Path> getPostProcessedClusterDirectories() {
+    return postProcessedClusterDirectories;
+  }
+
+  public void setClusteredPoints(Path clusteredPoints) {
+    this.clusteredPoints = clusteredPoints;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
new file mode 100644
index 0000000..82a3071
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be
+ * used for top down clustering. It can also be used if the clustering output needs to be grouped into their
+ * respective clusters.
+ */
+public final class ClusterOutputPostProcessorDriver extends AbstractJob {
+
+  /**
+   * CLI to run clustering post processor. The input to post processor is the ouput path specified to the
+   * clustering.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    Path input = getInputPath();
+    Path output = getOutputPath();
+
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+            DefaultOptionCreator.SEQUENTIAL_METHOD);
+    run(input, output, runSequential);
+    return 0;
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args);
+  }
+
+  /**
+   * Post processes the output of clustering algorithms and groups them into respective clusters. Each
+   * cluster's vectors are written into a directory named after its clusterId.
+   *
+   * @param input         The output path provided to the clustering algorithm, whose would be post processed. Hint: The
+   *                      path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output        The post processed data would be stored at this path.
+   * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint: If the clustering
+   *                      was done sequentially, make it sequential, else vice versa.
+   */
+  public static void run(Path input, Path output, boolean runSequential) throws IOException,
+          InterruptedException,
+          ClassNotFoundException {
+    if (runSequential) {
+      postProcessSeq(input, output);
+    } else {
+      Configuration conf = new Configuration();
+      postProcessMR(conf, input, output);
+      movePartFilesToRespectiveDirectories(conf, output);
+    }
+
+  }
+
+  /**
+   * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after
+   * their clusterId.
+   *
+   * @param input  The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *               path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output The post processed data would be stored at this path.
+   */
+  private static void postProcessSeq(Path input, Path output) throws IOException {
+    ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output,
+            new Configuration());
+    clusterOutputPostProcessor.process();
+  }
+
+  /**
+   * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the
+   * output. So that each cluster's vector is written in its own part file.
+   *
+   * @param conf   The hadoop configuration.
+   * @param input  The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *               path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output The post processed data would be stored at this path.
+   */
+  private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException,
+          InterruptedException,
+          ClassNotFoundException {
+    System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, "
+        + "as the MapReduce option will not work properly");
+    int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
+    conf.set("clusterOutputPath", input.toString());
+    Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(ClusterOutputPostProcessorMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setReducerClass(ClusterOutputPostProcessorReducer.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setNumReduceTasks(numberOfClusters);
+    job.setJarByClass(ClusterOutputPostProcessorDriver.class);
+
+    FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints")));
+    FileOutputFormat.setOutputPath(job, output);
+    if (!job.waitForCompletion(true)) {
+      throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input);
+    }
+  }
+
+  /**
+   * The mapreduce version of the post processor writes different clusters into different part files. This
+   * method reads the part files and moves them into directories named after their clusterIds.
+   *
+   * @param conf   The hadoop configuration.
+   * @param output The post processed data would be stored at this path.
+   */
+  private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException {
+    FileSystem fileSystem = output.getFileSystem(conf);
+    for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) {
+      SequenceFileIterator<Writable, Writable> it =
+              new SequenceFileIterator<>(fileStatus.getPath(), true, conf);
+      if (it.hasNext()) {
+        renameFile(it.next().getFirst(), fileStatus, conf);
+      }
+      it.close();
+    }
+  }
+
+  /**
+   * Using @FileSystem rename method to move the file.
+   */
+  private static void renameFile(Writable key, FileStatus fileStatus, Configuration conf) throws IOException {
+    Path path = fileStatus.getPath();
+    FileSystem fileSystem = path.getFileSystem(conf);
+    Path subDir = new Path(key.toString());
+    Path renameTo = new Path(path.getParent(), subDir);
+    fileSystem.mkdirs(renameTo);
+    fileSystem.rename(path, renameTo);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
new file mode 100644
index 0000000..6834362
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Mapper for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorMapper extends
+        Mapper<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+  private Map<Integer, Integer> newClusterMappings;
+  private VectorWritable outputVector;
+
+  //read the current cluster ids, and populate the cluster mapping hash table
+  @Override
+  public void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    //this give the clusters-x-final directory where the cluster ids can be read
+    Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+    //we want the key to be the cluster id, the value to be the index
+    newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true);
+    outputVector = new VectorWritable();
+  }
+
+  @Override
+  public void map(IntWritable key, WeightedVectorWritable val, Context context)
+    throws IOException, InterruptedException {
+    // by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer,
+    // since they are numbered from 0 to k-1, where k is the number of clusters
+    outputVector.set(val.getVector());
+    context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
new file mode 100644
index 0000000..58dada4
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Reducer for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorReducer
+    extends Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+  private Map<Integer, Integer> reverseClusterMappings;
+
+  //read the current cluster ids, and populate the hash cluster mapping hash table
+  @Override
+  public void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+    //we want to the key to be the index, the value to be the cluster id
+    reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false);
+  }
+
+  /**
+   * The key is the remapped cluster id and the values contains the vectors in that cluster.
+   */
+  @Override
+  protected void reduce(IntWritable key, Iterable<VectorWritable> values, Context context) throws IOException,
+          InterruptedException {
+    //remap the cluster back to its original id
+    //and then output the vectors with their correct
+    //cluster id.
+    IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get()));
+    System.out.println(outKey + " this: " + this);
+    for (VectorWritable value : values) {
+      context.write(outKey, value);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/AbstractJob.java b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
new file mode 100644
index 0000000..ec77749
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.lucene.AnalyzerUtils;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or
+ * more maps and reduces in order to accomplish some task.</p>
+ *
+ * <p>Command line arguments available to all subclasses are:</p>
+ *
+ * <ul>
+ *  <li>--tempDir (path): Specifies a directory where the job may place temp files
+ *   (default "temp")</li>
+ *  <li>--help: Show help message</li>
+ * </ul>
+ *
+ * <p>In addition, note some key command line parameters that are parsed by Hadoop, which jobs
+ * may need to set:</p>
+ *
+ * <ul>
+ *  <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be suffixed by
+ *    the mapper and reducer class names</li>
+ *  <li>-Dmapred.output.compress={true,false}: Compress final output (default true)</li>
+ *  <li>-Dmapred.input.dir=(path): input file, or directory containing input files (required)</li>
+ *  <li>-Dmapred.output.dir=(path): path to write output files (required)</li>
+ * </ul>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other
+ * arguments.</p>
+ */
+public abstract class AbstractJob extends Configured implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);
+
+  /** option used to specify the input path */
+  private Option inputOption;
+
+  /** option used to specify the output path */
+  private Option outputOption;
+
+  /** input path, populated by {@link #parseArguments(String[])} */
+  protected Path inputPath;
+  protected File inputFile; //the input represented as a file
+
+  /** output path, populated by {@link #parseArguments(String[])} */
+  protected Path outputPath;
+  protected File outputFile; //the output represented as a file
+
+  /** temp path, populated by {@link #parseArguments(String[])} */
+  protected Path tempPath;
+
+  protected Map<String, List<String>> argMap;
+
+  /** internal list of options that have been added */
+  private final List<Option> options;
+  private Group group;
+
+  protected AbstractJob() {
+    options = Lists.newLinkedList();
+  }
+
+  /** Returns the input path established by a call to {@link #parseArguments(String[])}.
+   *  The source of the path may be an input option added using {@link #addInputOption()}
+   *  or it may be the value of the {@code mapred.input.dir} configuration
+   *  property. 
+   */
+  protected Path getInputPath() {
+    return inputPath;
+  }
+
+  /** Returns the output path established by a call to {@link #parseArguments(String[])}.
+   *  The source of the path may be an output option added using {@link #addOutputOption()}
+   *  or it may be the value of the {@code mapred.input.dir} configuration
+   *  property. 
+   */
+  protected Path getOutputPath() {
+    return outputPath;
+  }
+
+  protected Path getOutputPath(String path) {
+    return new Path(outputPath, path);
+  }
+
+  protected File getInputFile() {
+    return inputFile;
+  }
+
+  protected File getOutputFile() {
+    return outputFile;
+  }
+
+
+  protected Path getTempPath() {
+    return tempPath;
+  }
+
+  protected Path getTempPath(String directory) {
+    return new Path(tempPath, directory);
+  }
+  
+  @Override
+  public Configuration getConf() {
+    Configuration result = super.getConf();
+    if (result == null) {
+      return new Configuration();
+    }
+    return result;
+  }
+
+  /** Add an option with no argument whose presence can be checked for using
+   *  {@code containsKey} method on the map returned by {@link #parseArguments(String[])};
+   */
+  protected void addFlag(String name, String shortName, String description) {
+    options.add(buildOption(name, shortName, description, false, false, null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. This options has an argument
+   *  with null as its default value.
+   */
+  protected void addOption(String name, String shortName, String description) {
+    options.add(buildOption(name, shortName, description, true, false, null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called.
+   *
+   * @param required if true the {@link #parseArguments(String[])} will throw
+   *    fail with an error and usage message if this option is not specified
+   *    on the command line.
+   */
+  protected void addOption(String name, String shortName, String description, boolean required) {
+    options.add(buildOption(name, shortName, description, true, required, null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. If this option is not 
+   *  specified on the command line the default value will be 
+   *  used.
+   *
+   * @param defaultValue the default argument value if this argument is not
+   *   found on the command-line. null is allowed.
+   */
+  protected void addOption(String name, String shortName, String description, String defaultValue) {
+    options.add(buildOption(name, shortName, description, true, false, defaultValue));
+  }
+
+  /** Add an arbitrary option to the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. If this option has no
+   *  argument, use {@code containsKey} on the map returned by
+   *  {@code parseArguments} to check for its presence. Otherwise, the
+   *  string value of the option will be placed in the map using a key
+   *  equal to this options long name preceded by '--'.
+   * @return the option added.
+   */
+  protected Option addOption(Option option) {
+    options.add(option);
+    return option;
+  }
+
+  protected Group getGroup() {
+    return group;
+  }
+
+  /** Add the default input directory option, '-i' which takes a directory
+   *  name as an argument. When {@link #parseArguments(String[])} is 
+   *  called, the inputPath will be set based upon the value for this option.
+   *  If this method is called, the input is required.
+   */
+  protected void addInputOption() {
+    this.inputOption = addOption(DefaultOptionCreator.inputOption().create());
+  }
+
+  /** Add the default output directory option, '-o' which takes a directory
+   *  name as an argument. When {@link #parseArguments(String[])} is 
+   *  called, the outputPath will be set based upon the value for this option.
+   *  If this method is called, the output is required. 
+   */
+  protected void addOutputOption() {
+    this.outputOption = addOption(DefaultOptionCreator.outputOption().create());
+  }
+
+  /** Build an option with the given parameters. Name and description are
+   *  required.
+   * 
+   * @param name the long name of the option prefixed with '--' on the command-line
+   * @param shortName the short name of the option, prefixed with '-' on the command-line
+   * @param description description of the option displayed in help method
+   * @param hasArg true if the option has an argument.
+   * @param required true if the option is required.
+   * @param defaultValue default argument value, can be null.
+   * @return the option.
+   */
+  protected static Option buildOption(String name,
+                                      String shortName,
+                                      String description,
+                                      boolean hasArg,
+                                      boolean required,
+                                      String defaultValue) {
+
+    return buildOption(name, shortName, description, hasArg, 1, 1, required, defaultValue);
+  }
+
+  protected static Option buildOption(String name,
+                                      String shortName,
+                                      String description,
+                                      boolean hasArg, int min, int max,
+                                      boolean required,
+                                      String defaultValue) {
+
+    DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withDescription(description)
+        .withRequired(required);
+
+    if (shortName != null) {
+      optBuilder.withShortName(shortName);
+    }
+
+    if (hasArg) {
+      ArgumentBuilder argBuilder = new ArgumentBuilder().withName(name).withMinimum(min).withMaximum(max);
+
+      if (defaultValue != null) {
+        argBuilder = argBuilder.withDefault(defaultValue);
+      }
+
+      optBuilder.withArgument(argBuilder.create());
+    }
+
+    return optBuilder.create();
+  }
+
+  /**
+   * @param name The name of the option
+   * @return the {@link org.apache.commons.cli2.Option} with the name, else null
+   */
+  protected Option getCLIOption(String name) {
+    for (Option option : options) {
+      if (option.getPreferredName().equals(name)) {
+        return option;
+      }
+    }
+    return null;
+  }
+
+  /** Parse the arguments specified based on the options defined using the 
+   *  various {@code addOption} methods. If -h is specified or an
+   *  exception is encountered print help and return null. Has the 
+   *  side effect of setting inputPath and outputPath 
+   *  if {@code addInputOption} or {@code addOutputOption}
+   *  or {@code mapred.input.dir} or {@code mapred.output.dir}
+   *  are present in the Configuration.
+   *
+   * @return a {@code Map<String,String>} containing options and their argument values.
+   *  The presence of a flag can be tested using {@code containsKey}, while
+   *  argument values can be retrieved using {@code get(optionName)}. The
+   *  names used for keys are the option name parameter prefixed by '--'.
+   *
+   * @see #parseArguments(String[], boolean, boolean)  -- passes in false, false for the optional args.
+   */
+  public Map<String, List<String>> parseArguments(String[] args) throws IOException {
+    return parseArguments(args, false, false);
+  }
+
+  /**
+   *
+   * @param args  The args to parse
+   * @param inputOptional if false, then the input option, if set, need not be present.  If true and input is an option
+   *                      and there is no input, then throw an error
+   * @param outputOptional if false, then the output option, if set, need not be present.  If true and output is an
+   *                       option and there is no output, then throw an error
+   * @return the args parsed into a map.
+   */
+  public Map<String, List<String>> parseArguments(String[] args, boolean inputOptional, boolean outputOptional)
+    throws IOException {
+    Option helpOpt = addOption(DefaultOptionCreator.helpOption());
+    addOption("tempDir", null, "Intermediate output directory", "temp");
+    addOption("startPhase", null, "First phase to run", "0");
+    addOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE));
+
+    GroupBuilder gBuilder = new GroupBuilder().withName("Job-Specific Options:");
+
+    for (Option opt : options) {
+      gBuilder = gBuilder.withOption(opt);
+    }
+
+    group = gBuilder.create();
+
+    CommandLine cmdLine;
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      parser.setHelpOption(helpOpt);
+      cmdLine = parser.parse(args);
+
+    } catch (OptionException e) {
+      log.error(e.getMessage());
+      CommandLineUtil.printHelpWithGenericOptions(group, e);
+      return null;
+    }
+
+    if (cmdLine.hasOption(helpOpt)) {
+      CommandLineUtil.printHelpWithGenericOptions(group);
+      return null;
+    }
+
+    try {
+      parseDirectories(cmdLine, inputOptional, outputOptional);
+    } catch (IllegalArgumentException e) {
+      log.error(e.getMessage());
+      CommandLineUtil.printHelpWithGenericOptions(group);
+      return null;
+    }
+
+    argMap = new TreeMap<String, List<String>>();
+    maybePut(argMap, cmdLine, this.options.toArray(new Option[this.options.size()]));
+
+    this.tempPath = new Path(getOption("tempDir"));
+
+    if (!hasOption("quiet")) {
+      log.info("Command line arguments: {}", argMap);
+    }
+    return argMap;
+  }
+  
+  /**
+   * Build the option key (--name) from the option name
+   */
+  public static String keyFor(String optionName) {
+    return "--" + optionName;
+  }
+
+  /**
+   * @return the requested option, or null if it has not been specified
+   */
+  public String getOption(String optionName) {
+    List<String> list = argMap.get(keyFor(optionName));
+    if (list != null && !list.isEmpty()) {
+      return list.get(0);
+    }
+    return null;
+  }
+
+  /**
+   * Get the option, else the default
+   * @param optionName The name of the option to look up, without the --
+   * @param defaultVal The default value.
+   * @return The requested option, else the default value if it doesn't exist
+   */
+  public String getOption(String optionName, String defaultVal) {
+    String res = getOption(optionName);
+    if (res == null) {
+      res = defaultVal;
+    }
+    return res;
+  }
+
+  public int getInt(String optionName) {
+    return Integer.parseInt(getOption(optionName));
+  }
+
+  public int getInt(String optionName, int defaultVal) {
+    return Integer.parseInt(getOption(optionName, String.valueOf(defaultVal)));
+  }
+
+  public float getFloat(String optionName) {
+    return Float.parseFloat(getOption(optionName));
+  }
+
+  public float getFloat(String optionName, float defaultVal) {
+    return Float.parseFloat(getOption(optionName, String.valueOf(defaultVal)));
+  }
+
+  /**
+   * Options can occur multiple times, so return the list
+   * @param optionName The unadorned (no "--" prefixing it) option name
+   * @return The values, else null.  If the option is present, but has no values, then the result will be an
+   * empty list (Collections.emptyList())
+   */
+  public List<String> getOptions(String optionName) {
+    return argMap.get(keyFor(optionName));
+  }
+
+  /**
+   * @return if the requested option has been specified
+   */
+  public boolean hasOption(String optionName) {
+    return argMap.containsKey(keyFor(optionName));
+  }
+
+
+  /**
+   * Get the cardinality of the input vectors
+   *
+   * @param matrix
+   * @return the cardinality of the vector
+   */
+  public int getDimensions(Path matrix) throws IOException {
+
+    SequenceFile.Reader reader = null;
+    try {
+      reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf());
+
+      Writable row = ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), Writable.class);
+
+      Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class),
+          "value type of sequencefile must be a VectorWritable");
+
+      VectorWritable vectorWritable = new VectorWritable();
+      boolean hasAtLeastOneRow = reader.next(row, vectorWritable);
+      Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least one row");
+
+      return vectorWritable.get().size();
+
+    } finally {
+      Closeables.close(reader, true);
+    }
+  }
+
+  /**
+   * Obtain input and output directories from command-line options or hadoop
+   *  properties. If {@code addInputOption} or {@code addOutputOption}
+   *  has been called, this method will throw an {@code OptionException} if
+   *  no source (command-line or property) for that value is present. 
+   *  Otherwise, {@code inputPath} or {@code outputPath} will be
+   *  non-null only if specified as a hadoop property. Command-line options
+   *  take precedence over hadoop properties.
+   *
+   * @throws IllegalArgumentException if either inputOption is present,
+   *   and neither {@code --input} nor {@code -Dmapred.input dir} are
+   *   specified or outputOption is present and neither {@code --output}
+   *   nor {@code -Dmapred.output.dir} are specified.
+   */
+  protected void parseDirectories(CommandLine cmdLine, boolean inputOptional, boolean outputOptional) {
+
+    Configuration conf = getConf();
+
+    if (inputOption != null && cmdLine.hasOption(inputOption)) {
+      this.inputPath = new Path(cmdLine.getValue(inputOption).toString());
+      this.inputFile = new File(cmdLine.getValue(inputOption).toString());
+    }
+    if (inputPath == null && conf.get("mapred.input.dir") != null) {
+      this.inputPath = new Path(conf.get("mapred.input.dir"));
+    }
+
+    if (outputOption != null && cmdLine.hasOption(outputOption)) {
+      this.outputPath = new Path(cmdLine.getValue(outputOption).toString());
+      this.outputFile = new File(cmdLine.getValue(outputOption).toString());
+    }
+    if (outputPath == null && conf.get("mapred.output.dir") != null) {
+      this.outputPath = new Path(conf.get("mapred.output.dir"));
+    }
+
+    Preconditions.checkArgument(inputOptional || inputOption == null || inputPath != null,
+        "No input specified or -Dmapred.input.dir must be provided to specify input directory");
+    Preconditions.checkArgument(outputOptional || outputOption == null || outputPath != null,
+        "No output specified:  or -Dmapred.output.dir must be provided to specify output directory");
+  }
+
+  protected static void maybePut(Map<String, List<String>> args, CommandLine cmdLine, Option... opt) {
+    for (Option o : opt) {
+
+      // the option appeared on the command-line, or it has a value
+      // (which is likely a default value). 
+      if (cmdLine.hasOption(o) || cmdLine.getValue(o) != null
+          || (cmdLine.getValues(o) != null && !cmdLine.getValues(o).isEmpty())) {
+
+        // nulls are ok, for cases where options are simple flags.
+        List<?> vo = cmdLine.getValues(o);
+        if (vo != null && !vo.isEmpty()) {
+          List<String> vals = Lists.newArrayList();
+          for (Object o1 : vo) {
+            vals.add(o1.toString());
+          }
+          args.put(o.getPreferredName(), vals);
+        } else {
+          args.put(o.getPreferredName(), null);
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param args The input argument map
+   * @param optName The adorned (including "--") option name
+   * @return The first value in the match, else null
+   */
+  public static String getOption(Map<String, List<String>> args, String optName) {
+    List<String> res = args.get(optName);
+    if (res != null && !res.isEmpty()) {
+      return res.get(0);
+    }
+    return null;
+  }
+
+
+  protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) {
+    int phase = currentPhase.getAndIncrement();
+    String startPhase = getOption(args, "--startPhase");
+    String endPhase = getOption(args, "--endPhase");
+    boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase))
+        || (endPhase != null && phase > Integer.parseInt(endPhase));
+    if (phaseSkipped) {
+      log.info("Skipping phase {}", phase);
+    }
+    return !phaseSkipped;
+  }
+
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat) throws IOException {
+    return prepareJob(inputPath, outputPath, inputFormat, mapper, mapperKey, mapperValue, outputFormat, null);
+
+  }
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat,
+                           String jobname) throws IOException {
+
+    Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+            inputFormat, mapper, mapperKey, mapperValue, outputFormat, getConf());
+
+    String name =
+        jobname != null ? jobname : HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class);
+
+    job.setJobName(name);
+    return job;
+
+  }
+
+  protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper,
+      Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,
+      Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException {
+    return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer,
+        reducerKey, reducerValue, SequenceFileOutputFormat.class);
+  }
+
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends Reducer> reducer,
+                           Class<? extends Writable> reducerKey,
+                           Class<? extends Writable> reducerValue,
+                           Class<? extends OutputFormat> outputFormat) throws IOException {
+    Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+            inputFormat, mapper, mapperKey, mapperValue, reducer, reducerKey, reducerValue, outputFormat, getConf());
+    job.setJobName(HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class));
+    return job;
+  }
+
+  /**
+   * necessary to make this job (having a combined input path) work on Amazon S3, hopefully this is
+   * obsolete when MultipleInputs is available again
+   */
+  public static void setS3SafeCombinedInputPath(Job job, Path referencePath, Path inputPathOne, Path inputPathTwo)
+    throws IOException {
+    FileSystem fs = FileSystem.get(referencePath.toUri(), job.getConfiguration());
+    FileInputFormat.setInputPaths(job, inputPathOne.makeQualified(fs), inputPathTwo.makeQualified(fs));
+  }
+
+  protected Class<? extends Analyzer> getAnalyzerClassFromOption() throws ClassNotFoundException {
+    Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
+    if (hasOption(DefaultOptionCreator.ANALYZER_NAME_OPTION)) {
+      String className = getOption(DefaultOptionCreator.ANALYZER_NAME_OPTION);
+      analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+      // try instantiating it, b/c there isn't any point in setting it if
+      // you can't instantiate it
+      //ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+      AnalyzerUtils.createAnalyzer(analyzerClass);
+    }
+    return analyzerClass;
+  }
+  
+  /**
+   * Overrides the base implementation to install the Oozie action configuration resource
+   * into the provided Configuration object; note that ToolRunner calls setConf on the Tool
+   * before it invokes run.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+      
+    // If running in an Oozie workflow as a Java action, need to add the
+    // Configuration resource provided by Oozie to this job's config.
+    String oozieActionConfXml = System.getProperty("oozie.action.conf.xml");
+    if (oozieActionConfXml != null && conf != null) {
+      conf.addResource(new Path("file:///", oozieActionConfXml));
+      log.info("Added Oozie action Configuration resource {} to the Hadoop Configuration", oozieActionConfXml);
+    }      
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/ClassUtils.java b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
new file mode 100644
index 0000000..8052ef1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.lang.reflect.InvocationTargetException;
+
+public final class ClassUtils {
+
+  private ClassUtils() {}
+
+  public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) {
+    try {
+      return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass, Class<?>[] params, Object[] args) {
+    try {
+      return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass, params, args);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static <T> T instantiateAs(Class<? extends T> clazz,
+                                    Class<T> asSubclassOfClass,
+                                    Class<?>[] params,
+                                    Object[] args) {
+    try {
+      return clazz.asSubclass(asSubclassOfClass).getConstructor(params).newInstance(args);
+    } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) {
+      throw new IllegalStateException(ie);
+    }
+  }
+
+
+  public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) {
+    try {
+      return clazz.asSubclass(asSubclassOfClass).getConstructor().newInstance();
+    } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) {
+      throw new IllegalStateException(ie);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
new file mode 100644
index 0000000..0cc93ba
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import com.google.common.base.Charsets;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public final class CommandLineUtil {
+  
+  private CommandLineUtil() { }
+  
+  public static void printHelp(Group group) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.print();
+  }
+ 
+  /**
+   * Print the options supported by {@code GenericOptionsParser}.
+   * In addition to the options supported by the job, passed in as the
+   * group parameter.
+   *
+   * @param group job-specific command-line options.
+   */
+  public static void printHelpWithGenericOptions(Group group) throws IOException {
+    new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]);
+    PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true);
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.setPrintWriter(pw);
+    formatter.setFooter("Specify HDFS directories while running on hadoop; else specify local file system directories");
+    formatter.print();
+  }
+
+  public static void printHelpWithGenericOptions(Group group, OptionException oe) throws IOException {
+    new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]);
+    PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true);
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.setPrintWriter(pw);
+    formatter.setException(oe);
+    formatter.print();
+  }
+
+}