You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:56 UTC
[25/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
new file mode 100644
index 0000000..4bffb2b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.mapreduce;
+
+import java.io.IOException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.FastProjectionSearch;
+import org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch;
+import org.apache.mahout.math.neighborhood.ProjectionSearch;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+
+public final class StreamingKMeansUtilsMR {
+
+ private StreamingKMeansUtilsMR() {
+ }
+
+ /**
+ * Instantiates a searcher from a given configuration.
+ * @param conf the configuration
+ * @return the instantiated searcher
+ * @throws RuntimeException if the distance measure class cannot be instantiated
+ * @throws IllegalStateException if an unknown searcher class was requested
+ */
+ public static UpdatableSearcher searcherFromConfiguration(Configuration conf) {
+ DistanceMeasure distanceMeasure;
+ String distanceMeasureClass = conf.get(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ try {
+ distanceMeasure = (DistanceMeasure) Class.forName(distanceMeasureClass).getConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate distanceMeasure", e);
+ }
+
+ int numProjections = conf.getInt(StreamingKMeansDriver.NUM_PROJECTIONS_OPTION, 20);
+ int searchSize = conf.getInt(StreamingKMeansDriver.SEARCH_SIZE_OPTION, 10);
+
+ String searcherClass = conf.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION);
+
+ if (searcherClass.equals(BruteSearch.class.getName())) {
+ return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class,
+ new Class[]{DistanceMeasure.class}, new Object[]{distanceMeasure});
+ } else if (searcherClass.equals(FastProjectionSearch.class.getName())
+ || searcherClass.equals(ProjectionSearch.class.getName())) {
+ return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class,
+ new Class[]{DistanceMeasure.class, int.class, int.class},
+ new Object[]{distanceMeasure, numProjections, searchSize});
+ } else if (searcherClass.equals(LocalitySensitiveHashSearch.class.getName())) {
+ return ClassUtils.instantiateAs(searcherClass, LocalitySensitiveHashSearch.class,
+ new Class[]{DistanceMeasure.class, int.class},
+ new Object[]{distanceMeasure, searchSize});
+ } else {
+ throw new IllegalStateException("Unknown class instantiation requested");
+ }
+ }
+
+ /**
+ * Returns an Iterable of centroids from an Iterable of VectorWritables by creating a new Centroid containing
+ * a RandomAccessSparseVector as a delegate for each VectorWritable.
+ * @param inputIterable VectorWritable Iterable to get Centroids from
+ * @return the new Centroids
+ */
+ public static Iterable<Centroid> getCentroidsFromVectorWritable(Iterable<VectorWritable> inputIterable) {
+ return Iterables.transform(inputIterable, new Function<VectorWritable, Centroid>() {
+ private int numVectors = 0;
+ @Override
+ public Centroid apply(VectorWritable input) {
+ Preconditions.checkNotNull(input);
+ return new Centroid(numVectors++, new RandomAccessSparseVector(input.get()), 1);
+ }
+ });
+ }
+
+ /**
+ * Returns an Iterable of Centroid from an Iterable of Vector by either casting each Vector to Centroid (if the
+ * instance extends Centroid) or create a new Centroid based on that Vector.
+ * The implicit expectation is that the input will not have interleaving types of vectors. Otherwise, the numbering
+ * of new Centroids will become invalid.
+ * @param input Iterable of Vectors to cast
+ * @return the new Centroids
+ */
+ public static Iterable<Centroid> castVectorsToCentroids(Iterable<Vector> input) {
+ return Iterables.transform(input, new Function<Vector, Centroid>() {
+ private int numVectors = 0;
+ @Override
+ public Centroid apply(Vector input) {
+ Preconditions.checkNotNull(input);
+ if (input instanceof Centroid) {
+ return (Centroid) input;
+ } else {
+ return new Centroid(numVectors++, input, 1);
+ }
+ }
+ });
+ }
+
+ /**
+ * Writes centroids to a sequence file.
+ * @param centroids the centroids to write.
+ * @param path the path of the output file.
+ * @param conf the configuration for the HDFS to write the file to.
+ * @throws java.io.IOException
+ */
+ public static void writeCentroidsToSequenceFile(Iterable<Centroid> centroids, Path path, Configuration conf)
+ throws IOException {
+ SequenceFile.Writer writer = null;
+ try {
+ writer = SequenceFile.createWriter(FileSystem.get(conf), conf,
+ path, IntWritable.class, CentroidWritable.class);
+ int i = 0;
+ for (Centroid centroid : centroids) {
+ writer.append(new IntWritable(i++), new CentroidWritable(centroid));
+ }
+ } finally {
+ Closeables.close(writer, true);
+ }
+ }
+
+ public static void writeVectorsToSequenceFile(Iterable<? extends Vector> datapoints, Path path, Configuration conf)
+ throws IOException {
+ SequenceFile.Writer writer = null;
+ try {
+ writer = SequenceFile.createWriter(FileSystem.get(conf), conf,
+ path, IntWritable.class, VectorWritable.class);
+ int i = 0;
+ for (Vector vector : datapoints) {
+ writer.append(new IntWritable(i++), new VectorWritable(vector));
+ }
+ } finally {
+ Closeables.close(writer, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
new file mode 100644
index 0000000..55b7848
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.tools;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+
+public class ResplitSequenceFiles {
+
+ private String inputFile;
+ private String outputFileBase;
+ private int numSplits;
+
+ private Configuration conf;
+ private FileSystem fs;
+
+ private ResplitSequenceFiles() {}
+
+ private void writeSplit(Iterator<Pair<Writable, Writable>> inputIterator,
+ int numSplit, int numEntriesPerSplit) throws IOException {
+ SequenceFile.Writer splitWriter = null;
+ for (int j = 0; j < numEntriesPerSplit; ++j) {
+ Pair<Writable, Writable> item = inputIterator.next();
+ if (splitWriter == null) {
+ splitWriter = SequenceFile.createWriter(fs, conf,
+ new Path(outputFileBase + "-" + numSplit), item.getFirst().getClass(), item.getSecond().getClass());
+ }
+ splitWriter.append(item.getFirst(), item.getSecond());
+ }
+ if (splitWriter != null) {
+ splitWriter.close();
+ }
+ }
+
+ private void run(PrintWriter printWriter) throws IOException {
+ conf = new Configuration();
+ SequenceFileDirIterable<Writable, Writable> inputIterable = new
+ SequenceFileDirIterable<Writable, Writable>(new Path(inputFile), PathType.LIST, conf);
+ fs = FileSystem.get(conf);
+
+ int numEntries = Iterables.size(inputIterable);
+ int numEntriesPerSplit = numEntries / numSplits;
+ int numEntriesLastSplit = numEntriesPerSplit + numEntries - numEntriesPerSplit * numSplits;
+ Iterator<Pair<Writable, Writable>> inputIterator = inputIterable.iterator();
+
+ printWriter.printf("Writing %d splits\n", numSplits);
+ for (int i = 0; i < numSplits - 1; ++i) {
+ printWriter.printf("Writing split %d\n", i);
+ writeSplit(inputIterator, i, numEntriesPerSplit);
+ }
+ printWriter.printf("Writing split %d\n", numSplits - 1);
+ writeSplit(inputIterator, numSplits - 1, numEntriesLastSplit);
+ }
+
+ private boolean parseArgs(String[] args) {
+ DefaultOptionBuilder builder = new DefaultOptionBuilder();
+
+ Option help = builder.withLongName("help").withDescription("print this list").create();
+
+ ArgumentBuilder argumentBuilder = new ArgumentBuilder();
+ Option inputFileOption = builder.withLongName("input")
+ .withShortName("i")
+ .withRequired(true)
+ .withArgument(argumentBuilder.withName("input").withMaximum(1).create())
+ .withDescription("what the base folder for sequence files is (they all must have the same key/value type")
+ .create();
+
+ Option outputFileOption = builder.withLongName("output")
+ .withShortName("o")
+ .withRequired(true)
+ .withArgument(argumentBuilder.withName("output").withMaximum(1).create())
+ .withDescription("the base name of the file split that the files will be split it; the i'th split has the "
+ + "suffix -i")
+ .create();
+
+ Option numSplitsOption = builder.withLongName("numSplits")
+ .withShortName("ns")
+ .withRequired(true)
+ .withArgument(argumentBuilder.withName("numSplits").withMaximum(1).create())
+ .withDescription("how many splits to use for the given files")
+ .create();
+
+ Group normalArgs = new GroupBuilder()
+ .withOption(help)
+ .withOption(inputFileOption)
+ .withOption(outputFileOption)
+ .withOption(numSplitsOption)
+ .create();
+
+ Parser parser = new Parser();
+ parser.setHelpOption(help);
+ parser.setHelpTrigger("--help");
+ parser.setGroup(normalArgs);
+ parser.setHelpFormatter(new HelpFormatter(" ", "", " ", 130));
+ CommandLine cmdLine = parser.parseAndHelp(args);
+
+ if (cmdLine == null) {
+ return false;
+ }
+
+ inputFile = (String) cmdLine.getValue(inputFileOption);
+ outputFileBase = (String) cmdLine.getValue(outputFileOption);
+ numSplits = Integer.parseInt((String) cmdLine.getValue(numSplitsOption));
+ return true;
+ }
+
+ public static void main(String[] args) throws IOException {
+ ResplitSequenceFiles runner = new ResplitSequenceFiles();
+ if (runner.parseArgs(args)) {
+ runner.run(new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
new file mode 100644
index 0000000..11bc34a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Contains list of all internal paths used in top down clustering.
+ */
+public final class PathDirectory {
+
+ public static final String TOP_LEVEL_CLUSTER_DIRECTORY = "topLevelCluster";
+ public static final String POST_PROCESS_DIRECTORY = "clusterPostProcessed";
+ public static final String CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
+ public static final String BOTTOM_LEVEL_CLUSTER_DIRECTORY = "bottomLevelCluster";
+
+ private PathDirectory() {
+ }
+
+ /**
+ * All output of top level clustering is stored in output directory/topLevelCluster.
+ *
+ * @param output
+ * the output path of clustering.
+ * @return The top level Cluster Directory.
+ */
+ public static Path getTopLevelClusterPath(Path output) {
+ return new Path(output + File.separator + TOP_LEVEL_CLUSTER_DIRECTORY);
+ }
+
+ /**
+ * The output of top level clusters is post processed and kept in this path.
+ *
+ * @param outputPathProvidedByUser
+ * the output path of clustering.
+ * @return the path where the output of top level cluster post processor is kept.
+ */
+ public static Path getClusterPostProcessorOutputDirectory(Path outputPathProvidedByUser) {
+ return new Path(outputPathProvidedByUser + File.separator + POST_PROCESS_DIRECTORY);
+ }
+
+ /**
+ * The top level clustered points before post processing is generated here.
+ *
+ * @param output
+ * the output path of clustering.
+ * @return the clustered points directory
+ */
+ public static Path getClusterOutputClusteredPoints(Path output) {
+ return new Path(output + File.separator + CLUSTERED_POINTS_DIRECTORY + File.separator, "*");
+ }
+
+ /**
+ * Each cluster produced by top level clustering is processed in output/"bottomLevelCluster"/clusterId.
+ *
+ * @param output
+ * @param clusterId
+ * @return the bottom level clustering path.
+ */
+ public static Path getBottomLevelClusterPath(Path output, String clusterId) {
+ return new Path(output + File.separator + BOTTOM_LEVEL_CLUSTER_DIRECTORY + File.separator + clusterId);
+ }
+
+ /**
+ * Each clusters path name is its clusterId. The vectors reside in separate files inside it.
+ *
+ * @param clusterPostProcessorOutput
+ * the path of cluster post processor output.
+ * @param clusterId
+ * the id of the cluster.
+ * @return the cluster path for cluster id.
+ */
+ public static Path getClusterPathForClusterId(Path clusterPostProcessorOutput, String clusterId) {
+ return new Path(clusterPostProcessorOutput + File.separator + clusterId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
new file mode 100644
index 0000000..083b543
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Reads the number of clusters produced by the clustering algorithm.
+ */
+public final class ClusterCountReader {
+
+ private ClusterCountReader() {
+ }
+
+ /**
+ * Reads the number of clusters present by reading the clusters-*-final file.
+ *
+ * @param clusterOutputPath The output path provided to the clustering algorithm.
+ * @param conf The hadoop configuration.
+ * @return the number of final clusters.
+ */
+ public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException {
+ FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+ FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+ int numberOfClusters = 0;
+ Iterator<?> it = new SequenceFileDirValueIterator<Writable>(clusterFiles[0].getPath(),
+ PathType.LIST,
+ PathFilters.partFilter(),
+ null,
+ true,
+ conf);
+ while (it.hasNext()) {
+ it.next();
+ numberOfClusters++;
+ }
+ return numberOfClusters;
+ }
+
+ /**
+ * Generates a list of all cluster ids by reading the clusters-*-final file.
+ *
+ * @param clusterOutputPath The output path provided to the clustering algorithm.
+ * @param conf The hadoop configuration.
+ * @return An ArrayList containing the final cluster ids.
+ */
+ public static Map<Integer, Integer> getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId)
+ throws IOException {
+ Map<Integer, Integer> clusterIds = new HashMap<>();
+ FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+ FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+ //System.out.println("LOOK HERE: " + clusterOutputPath);
+ Iterator<ClusterWritable> it = new SequenceFileDirValueIterator<>(clusterFiles[0].getPath(),
+ PathType.LIST,
+ PathFilters.partFilter(),
+ null,
+ true,
+ conf);
+ int i = 0;
+ while (it.hasNext()) {
+ Integer key;
+ Integer value;
+ if (keyIsClusterId) { // key is the cluster id, value is i, the index we will use
+ key = it.next().getValue().getId();
+ value = i;
+ } else {
+ key = i;
+ value = it.next().getValue().getId();
+ }
+ clusterIds.put(key, value);
+ i++;
+ }
+ return clusterIds;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
new file mode 100644
index 0000000..44a944d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class reads the output of any clustering algorithm, and, creates separate directories for different
+ * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster
+ * directory associated with that point.
+ * <p/>
+ * This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered
+ * sequentially.
+ * <p/>
+ * The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}.
+ */
+public final class ClusterOutputPostProcessor {
+
+ private Path clusteredPoints;
+ private final FileSystem fileSystem;
+ private final Configuration conf;
+ private final Path clusterPostProcessorOutput;
+ private final Map<String, Path> postProcessedClusterDirectories = Maps.newHashMap();
+ private long uniqueVectorId = 0L;
+ private final Map<String, SequenceFile.Writer> writersForClusters;
+
+ public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed,
+ Path output,
+ Configuration hadoopConfiguration) throws IOException {
+ this.clusterPostProcessorOutput = output;
+ this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed);
+ this.conf = hadoopConfiguration;
+ this.writersForClusters = Maps.newHashMap();
+ fileSystem = clusteredPoints.getFileSystem(conf);
+ }
+
+ /**
+ * This method takes the clustered points output by the clustering algorithms as input and writes them into
+ * their respective clusters.
+ */
+ public void process() throws IOException {
+ createPostProcessDirectory();
+ for (Pair<?, WeightedVectorWritable> record
+ : new SequenceFileDirIterable<Writable, WeightedVectorWritable>(clusteredPoints, PathType.GLOB, PathFilters.partFilter(),
+ null, false, conf)) {
+ String clusterId = record.getFirst().toString().trim();
+ putVectorInRespectiveCluster(clusterId, record.getSecond());
+ }
+ IOUtils.close(writersForClusters.values());
+ writersForClusters.clear();
+ }
+
+ /**
+ * Creates the directory to put post processed clusters.
+ */
+ private void createPostProcessDirectory() throws IOException {
+ if (!fileSystem.exists(clusterPostProcessorOutput)
+ && !fileSystem.mkdirs(clusterPostProcessorOutput)) {
+ throw new IOException("Error creating cluster post processor directory");
+ }
+ }
+
+ /**
+ * Finds out the cluster directory of the vector and writes it into the specified cluster.
+ */
+ private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException {
+ Writer writer = findWriterForVector(clusterId);
+ postProcessedClusterDirectories.put(clusterId,
+ PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
+ writeVectorToCluster(writer, point);
+ }
+
+ /**
+ * Finds out the path in cluster where the point is supposed to be written.
+ */
+ private Writer findWriterForVector(String clusterId) throws IOException {
+ Path clusterDirectory = PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId);
+ Writer writer = writersForClusters.get(clusterId);
+ if (writer == null) {
+ Path pathToWrite = new Path(clusterDirectory, new Path("part-m-0"));
+ writer = new Writer(fileSystem, conf, pathToWrite, LongWritable.class, VectorWritable.class);
+ writersForClusters.put(clusterId, writer);
+ }
+ return writer;
+ }
+
+ /**
+ * Writes vector to the cluster directory.
+ */
+ private void writeVectorToCluster(Writer writer, WeightedVectorWritable point) throws IOException {
+ writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector()));
+ writer.sync();
+ }
+
+ /**
+ * @return the set of all post processed cluster paths.
+ */
+ public Map<String, Path> getPostProcessedClusterDirectories() {
+ return postProcessedClusterDirectories;
+ }
+
+ public void setClusteredPoints(Path clusteredPoints) {
+ this.clusteredPoints = clusteredPoints;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
new file mode 100644
index 0000000..82a3071
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be
+ * used for top down clustering. It can also be used if the clustering output needs to be grouped into their
+ * respective clusters.
+ */
+public final class ClusterOutputPostProcessorDriver extends AbstractJob {
+
+ /**
+ * CLI to run clustering post processor. The input to post processor is the ouput path specified to the
+ * clustering.
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.methodOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+ Path input = getInputPath();
+ Path output = getOutputPath();
+
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+ boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
+ run(input, output, runSequential);
+ return 0;
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args);
+ }
+
+ /**
+ * Post processes the output of clustering algorithms and groups them into respective clusters. Each
+ * cluster's vectors are written into a directory named after its clusterId.
+ *
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint: The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
+ * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint: If the clustering
+ * was done sequentially, make it sequential, else vice versa.
+ */
+ public static void run(Path input, Path output, boolean runSequential) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ if (runSequential) {
+ postProcessSeq(input, output);
+ } else {
+ Configuration conf = new Configuration();
+ postProcessMR(conf, input, output);
+ movePartFilesToRespectiveDirectories(conf, output);
+ }
+
+ }
+
+ /**
+ * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after
+ * their clusterId.
+ *
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
+ */
+ private static void postProcessSeq(Path input, Path output) throws IOException {
+ ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output,
+ new Configuration());
+ clusterOutputPostProcessor.process();
+ }
+
+ /**
+ * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the
+ * output. So that each cluster's vector is written in its own part file.
+ *
+ * @param conf The hadoop configuration.
+ * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+ * path of the directory containing clusters-*-final and clusteredPoints.
+ * @param output The post processed data would be stored at this path.
+ */
+ private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, "
+ + "as the MapReduce option will not work properly");
+ int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
+ conf.set("clusterOutputPath", input.toString());
+ Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(ClusterOutputPostProcessorMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+ job.setReducerClass(ClusterOutputPostProcessorReducer.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setNumReduceTasks(numberOfClusters);
+ job.setJarByClass(ClusterOutputPostProcessorDriver.class);
+
+ FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints")));
+ FileOutputFormat.setOutputPath(job, output);
+ if (!job.waitForCompletion(true)) {
+ throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input);
+ }
+ }
+
+ /**
+ * The mapreduce version of the post processor writes different clusters into different part files. This
+ * method reads the part files and moves them into directories named after their clusterIds.
+ *
+ * @param conf The hadoop configuration.
+ * @param output The post processed data would be stored at this path.
+ */
+ private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException {
+ FileSystem fileSystem = output.getFileSystem(conf);
+ for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) {
+ SequenceFileIterator<Writable, Writable> it =
+ new SequenceFileIterator<>(fileStatus.getPath(), true, conf);
+ if (it.hasNext()) {
+ renameFile(it.next().getFirst(), fileStatus, conf);
+ }
+ it.close();
+ }
+ }
+
+ /**
+ * Using @FileSystem rename method to move the file.
+ */
+ private static void renameFile(Writable key, FileStatus fileStatus, Configuration conf) throws IOException {
+ Path path = fileStatus.getPath();
+ FileSystem fileSystem = path.getFileSystem(conf);
+ Path subDir = new Path(key.toString());
+ Path renameTo = new Path(path.getParent(), subDir);
+ fileSystem.mkdirs(renameTo);
+ fileSystem.rename(path, renameTo);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
new file mode 100644
index 0000000..6834362
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Mapper for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorMapper extends
+ Mapper<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+ private Map<Integer, Integer> newClusterMappings;
+ private VectorWritable outputVector;
+
+ //read the current cluster ids, and populate the cluster mapping hash table
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ //this give the clusters-x-final directory where the cluster ids can be read
+ Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+ //we want the key to be the cluster id, the value to be the index
+ newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true);
+ outputVector = new VectorWritable();
+ }
+
+ @Override
+ public void map(IntWritable key, WeightedVectorWritable val, Context context)
+ throws IOException, InterruptedException {
+ // by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer,
+ // since they are numbered from 0 to k-1, where k is the number of clusters
+ outputVector.set(val.getVector());
+ context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
new file mode 100644
index 0000000..58dada4
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Reducer for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorReducer
+ extends Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private Map<Integer, Integer> reverseClusterMappings;
+
+ //read the current cluster ids, and populate the hash cluster mapping hash table
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path clusterOutputPath = new Path(conf.get("clusterOutputPath"));
+ //we want to the key to be the index, the value to be the cluster id
+ reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false);
+ }
+
+ /**
+ * The key is the remapped cluster id and the values contains the vectors in that cluster.
+ */
+ @Override
+ protected void reduce(IntWritable key, Iterable<VectorWritable> values, Context context) throws IOException,
+ InterruptedException {
+ //remap the cluster back to its original id
+ //and then output the vectors with their correct
+ //cluster id.
+ IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get()));
+ System.out.println(outKey + " this: " + this);
+ for (VectorWritable value : values) {
+ context.write(outKey, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/AbstractJob.java b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
new file mode 100644
index 0000000..ec77749
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.lucene.AnalyzerUtils;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or
+ * more maps and reduces in order to accomplish some task.</p>
+ *
+ * <p>Command line arguments available to all subclasses are:</p>
+ *
+ * <ul>
+ * <li>--tempDir (path): Specifies a directory where the job may place temp files
+ * (default "temp")</li>
+ * <li>--help: Show help message</li>
+ * </ul>
+ *
+ * <p>In addition, note some key command line parameters that are parsed by Hadoop, which jobs
+ * may need to set:</p>
+ *
+ * <ul>
+ * <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be suffixed by
+ * the mapper and reducer class names</li>
+ * <li>-Dmapred.output.compress={true,false}: Compress final output (default true)</li>
+ * <li>-Dmapred.input.dir=(path): input file, or directory containing input files (required)</li>
+ * <li>-Dmapred.output.dir=(path): path to write output files (required)</li>
+ * </ul>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other
+ * arguments.</p>
+ */
+public abstract class AbstractJob extends Configured implements Tool {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);
+
+ /** option used to specify the input path */
+ private Option inputOption;
+
+ /** option used to specify the output path */
+ private Option outputOption;
+
+ /** input path, populated by {@link #parseArguments(String[])} */
+ protected Path inputPath;
+ protected File inputFile; //the input represented as a file
+
+ /** output path, populated by {@link #parseArguments(String[])} */
+ protected Path outputPath;
+ protected File outputFile; //the output represented as a file
+
+ /** temp path, populated by {@link #parseArguments(String[])} */
+ protected Path tempPath;
+
+ protected Map<String, List<String>> argMap;
+
+ /** internal list of options that have been added */
+ private final List<Option> options;
+ private Group group;
+
+ protected AbstractJob() {
+ options = Lists.newLinkedList();
+ }
+
+ /** Returns the input path established by a call to {@link #parseArguments(String[])}.
+ * The source of the path may be an input option added using {@link #addInputOption()}
+ * or it may be the value of the {@code mapred.input.dir} configuration
+ * property.
+ */
+ protected Path getInputPath() {
+ return inputPath;
+ }
+
+ /** Returns the output path established by a call to {@link #parseArguments(String[])}.
+ * The source of the path may be an output option added using {@link #addOutputOption()}
+ * or it may be the value of the {@code mapred.input.dir} configuration
+ * property.
+ */
+ protected Path getOutputPath() {
+ return outputPath;
+ }
+
+ protected Path getOutputPath(String path) {
+ return new Path(outputPath, path);
+ }
+
+ protected File getInputFile() {
+ return inputFile;
+ }
+
+ protected File getOutputFile() {
+ return outputFile;
+ }
+
+
+ protected Path getTempPath() {
+ return tempPath;
+ }
+
+ protected Path getTempPath(String directory) {
+ return new Path(tempPath, directory);
+ }
+
+ @Override
+ public Configuration getConf() {
+ Configuration result = super.getConf();
+ if (result == null) {
+ return new Configuration();
+ }
+ return result;
+ }
+
+ /** Add an option with no argument whose presence can be checked for using
+ * {@code containsKey} method on the map returned by {@link #parseArguments(String[])};
+ */
+ protected void addFlag(String name, String shortName, String description) {
+ options.add(buildOption(name, shortName, description, false, false, null));
+ }
+
+ /** Add an option to the the set of options this job will parse when
+ * {@link #parseArguments(String[])} is called. This options has an argument
+ * with null as its default value.
+ */
+ protected void addOption(String name, String shortName, String description) {
+ options.add(buildOption(name, shortName, description, true, false, null));
+ }
+
+ /** Add an option to the the set of options this job will parse when
+ * {@link #parseArguments(String[])} is called.
+ *
+ * @param required if true the {@link #parseArguments(String[])} will throw
+ * fail with an error and usage message if this option is not specified
+ * on the command line.
+ */
+ protected void addOption(String name, String shortName, String description, boolean required) {
+ options.add(buildOption(name, shortName, description, true, required, null));
+ }
+
+ /** Add an option to the the set of options this job will parse when
+ * {@link #parseArguments(String[])} is called. If this option is not
+ * specified on the command line the default value will be
+ * used.
+ *
+ * @param defaultValue the default argument value if this argument is not
+ * found on the command-line. null is allowed.
+ */
+ protected void addOption(String name, String shortName, String description, String defaultValue) {
+ options.add(buildOption(name, shortName, description, true, false, defaultValue));
+ }
+
+ /** Add an arbitrary option to the set of options this job will parse when
+ * {@link #parseArguments(String[])} is called. If this option has no
+ * argument, use {@code containsKey} on the map returned by
+ * {@code parseArguments} to check for its presence. Otherwise, the
+ * string value of the option will be placed in the map using a key
+ * equal to this options long name preceded by '--'.
+ * @return the option added.
+ */
+ protected Option addOption(Option option) {
+ options.add(option);
+ return option;
+ }
+
+ protected Group getGroup() {
+ return group;
+ }
+
+ /** Add the default input directory option, '-i' which takes a directory
+ * name as an argument. When {@link #parseArguments(String[])} is
+ * called, the inputPath will be set based upon the value for this option.
+ * If this method is called, the input is required.
+ */
+ protected void addInputOption() {
+ this.inputOption = addOption(DefaultOptionCreator.inputOption().create());
+ }
+
+ /** Add the default output directory option, '-o' which takes a directory
+ * name as an argument. When {@link #parseArguments(String[])} is
+ * called, the outputPath will be set based upon the value for this option.
+ * If this method is called, the output is required.
+ */
+ protected void addOutputOption() {
+ this.outputOption = addOption(DefaultOptionCreator.outputOption().create());
+ }
+
+ /** Build an option with the given parameters. Name and description are
+ * required.
+ *
+ * @param name the long name of the option prefixed with '--' on the command-line
+ * @param shortName the short name of the option, prefixed with '-' on the command-line
+ * @param description description of the option displayed in help method
+ * @param hasArg true if the option has an argument.
+ * @param required true if the option is required.
+ * @param defaultValue default argument value, can be null.
+ * @return the option.
+ */
+ protected static Option buildOption(String name,
+ String shortName,
+ String description,
+ boolean hasArg,
+ boolean required,
+ String defaultValue) {
+
+ return buildOption(name, shortName, description, hasArg, 1, 1, required, defaultValue);
+ }
+
+ protected static Option buildOption(String name,
+ String shortName,
+ String description,
+ boolean hasArg, int min, int max,
+ boolean required,
+ String defaultValue) {
+
+ DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withDescription(description)
+ .withRequired(required);
+
+ if (shortName != null) {
+ optBuilder.withShortName(shortName);
+ }
+
+ if (hasArg) {
+ ArgumentBuilder argBuilder = new ArgumentBuilder().withName(name).withMinimum(min).withMaximum(max);
+
+ if (defaultValue != null) {
+ argBuilder = argBuilder.withDefault(defaultValue);
+ }
+
+ optBuilder.withArgument(argBuilder.create());
+ }
+
+ return optBuilder.create();
+ }
+
+ /**
+ * @param name The name of the option
+ * @return the {@link org.apache.commons.cli2.Option} with the name, else null
+ */
+ protected Option getCLIOption(String name) {
+ for (Option option : options) {
+ if (option.getPreferredName().equals(name)) {
+ return option;
+ }
+ }
+ return null;
+ }
+
+ /** Parse the arguments specified based on the options defined using the
+ * various {@code addOption} methods. If -h is specified or an
+ * exception is encountered print help and return null. Has the
+ * side effect of setting inputPath and outputPath
+ * if {@code addInputOption} or {@code addOutputOption}
+ * or {@code mapred.input.dir} or {@code mapred.output.dir}
+ * are present in the Configuration.
+ *
+ * @return a {@code Map<String,String>} containing options and their argument values.
+ * The presence of a flag can be tested using {@code containsKey}, while
+ * argument values can be retrieved using {@code get(optionName)}. The
+ * names used for keys are the option name parameter prefixed by '--'.
+ *
+ * @see #parseArguments(String[], boolean, boolean) -- passes in false, false for the optional args.
+ */
+ public Map<String, List<String>> parseArguments(String[] args) throws IOException {
+ return parseArguments(args, false, false);
+ }
+
+ /**
+ *
+ * @param args The args to parse
+ * @param inputOptional if false, then the input option, if set, need not be present. If true and input is an option
+ * and there is no input, then throw an error
+ * @param outputOptional if false, then the output option, if set, need not be present. If true and output is an
+ * option and there is no output, then throw an error
+ * @return the args parsed into a map.
+ */
+ public Map<String, List<String>> parseArguments(String[] args, boolean inputOptional, boolean outputOptional)
+ throws IOException {
+ Option helpOpt = addOption(DefaultOptionCreator.helpOption());
+ addOption("tempDir", null, "Intermediate output directory", "temp");
+ addOption("startPhase", null, "First phase to run", "0");
+ addOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE));
+
+ GroupBuilder gBuilder = new GroupBuilder().withName("Job-Specific Options:");
+
+ for (Option opt : options) {
+ gBuilder = gBuilder.withOption(opt);
+ }
+
+ group = gBuilder.create();
+
+ CommandLine cmdLine;
+ try {
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ parser.setHelpOption(helpOpt);
+ cmdLine = parser.parse(args);
+
+ } catch (OptionException e) {
+ log.error(e.getMessage());
+ CommandLineUtil.printHelpWithGenericOptions(group, e);
+ return null;
+ }
+
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelpWithGenericOptions(group);
+ return null;
+ }
+
+ try {
+ parseDirectories(cmdLine, inputOptional, outputOptional);
+ } catch (IllegalArgumentException e) {
+ log.error(e.getMessage());
+ CommandLineUtil.printHelpWithGenericOptions(group);
+ return null;
+ }
+
+ argMap = new TreeMap<String, List<String>>();
+ maybePut(argMap, cmdLine, this.options.toArray(new Option[this.options.size()]));
+
+ this.tempPath = new Path(getOption("tempDir"));
+
+ if (!hasOption("quiet")) {
+ log.info("Command line arguments: {}", argMap);
+ }
+ return argMap;
+ }
+
+ /**
+ * Build the option key (--name) from the option name
+ */
+ public static String keyFor(String optionName) {
+ return "--" + optionName;
+ }
+
+ /**
+ * @return the requested option, or null if it has not been specified
+ */
+ public String getOption(String optionName) {
+ List<String> list = argMap.get(keyFor(optionName));
+ if (list != null && !list.isEmpty()) {
+ return list.get(0);
+ }
+ return null;
+ }
+
+ /**
+ * Get the option, else the default
+ * @param optionName The name of the option to look up, without the --
+ * @param defaultVal The default value.
+ * @return The requested option, else the default value if it doesn't exist
+ */
+ public String getOption(String optionName, String defaultVal) {
+ String res = getOption(optionName);
+ if (res == null) {
+ res = defaultVal;
+ }
+ return res;
+ }
+
+ public int getInt(String optionName) {
+ return Integer.parseInt(getOption(optionName));
+ }
+
+ public int getInt(String optionName, int defaultVal) {
+ return Integer.parseInt(getOption(optionName, String.valueOf(defaultVal)));
+ }
+
+ public float getFloat(String optionName) {
+ return Float.parseFloat(getOption(optionName));
+ }
+
+ public float getFloat(String optionName, float defaultVal) {
+ return Float.parseFloat(getOption(optionName, String.valueOf(defaultVal)));
+ }
+
+ /**
+ * Options can occur multiple times, so return the list
+ * @param optionName The unadorned (no "--" prefixing it) option name
+ * @return The values, else null. If the option is present, but has no values, then the result will be an
+ * empty list (Collections.emptyList())
+ */
+ public List<String> getOptions(String optionName) {
+ return argMap.get(keyFor(optionName));
+ }
+
+ /**
+ * @return if the requested option has been specified
+ */
+ public boolean hasOption(String optionName) {
+ return argMap.containsKey(keyFor(optionName));
+ }
+
+
+ /**
+ * Get the cardinality of the input vectors
+ *
+ * @param matrix
+ * @return the cardinality of the vector
+ */
+ public int getDimensions(Path matrix) throws IOException {
+
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf());
+
+ Writable row = ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), Writable.class);
+
+ Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class),
+ "value type of sequencefile must be a VectorWritable");
+
+ VectorWritable vectorWritable = new VectorWritable();
+ boolean hasAtLeastOneRow = reader.next(row, vectorWritable);
+ Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least one row");
+
+ return vectorWritable.get().size();
+
+ } finally {
+ Closeables.close(reader, true);
+ }
+ }
+
+ /**
+ * Obtain input and output directories from command-line options or hadoop
+ * properties. If {@code addInputOption} or {@code addOutputOption}
+ * has been called, this method will throw an {@code OptionException} if
+ * no source (command-line or property) for that value is present.
+ * Otherwise, {@code inputPath} or {@code outputPath} will be
+ * non-null only if specified as a hadoop property. Command-line options
+ * take precedence over hadoop properties.
+ *
+ * @throws IllegalArgumentException if either inputOption is present,
+ * and neither {@code --input} nor {@code -Dmapred.input dir} are
+ * specified or outputOption is present and neither {@code --output}
+ * nor {@code -Dmapred.output.dir} are specified.
+ */
+ protected void parseDirectories(CommandLine cmdLine, boolean inputOptional, boolean outputOptional) {
+
+ Configuration conf = getConf();
+
+ if (inputOption != null && cmdLine.hasOption(inputOption)) {
+ this.inputPath = new Path(cmdLine.getValue(inputOption).toString());
+ this.inputFile = new File(cmdLine.getValue(inputOption).toString());
+ }
+ if (inputPath == null && conf.get("mapred.input.dir") != null) {
+ this.inputPath = new Path(conf.get("mapred.input.dir"));
+ }
+
+ if (outputOption != null && cmdLine.hasOption(outputOption)) {
+ this.outputPath = new Path(cmdLine.getValue(outputOption).toString());
+ this.outputFile = new File(cmdLine.getValue(outputOption).toString());
+ }
+ if (outputPath == null && conf.get("mapred.output.dir") != null) {
+ this.outputPath = new Path(conf.get("mapred.output.dir"));
+ }
+
+ Preconditions.checkArgument(inputOptional || inputOption == null || inputPath != null,
+ "No input specified or -Dmapred.input.dir must be provided to specify input directory");
+ Preconditions.checkArgument(outputOptional || outputOption == null || outputPath != null,
+ "No output specified: or -Dmapred.output.dir must be provided to specify output directory");
+ }
+
+ protected static void maybePut(Map<String, List<String>> args, CommandLine cmdLine, Option... opt) {
+ for (Option o : opt) {
+
+ // the option appeared on the command-line, or it has a value
+ // (which is likely a default value).
+ if (cmdLine.hasOption(o) || cmdLine.getValue(o) != null
+ || (cmdLine.getValues(o) != null && !cmdLine.getValues(o).isEmpty())) {
+
+ // nulls are ok, for cases where options are simple flags.
+ List<?> vo = cmdLine.getValues(o);
+ if (vo != null && !vo.isEmpty()) {
+ List<String> vals = Lists.newArrayList();
+ for (Object o1 : vo) {
+ vals.add(o1.toString());
+ }
+ args.put(o.getPreferredName(), vals);
+ } else {
+ args.put(o.getPreferredName(), null);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param args The input argument map
+ * @param optName The adorned (including "--") option name
+ * @return The first value in the match, else null
+ */
+ public static String getOption(Map<String, List<String>> args, String optName) {
+ List<String> res = args.get(optName);
+ if (res != null && !res.isEmpty()) {
+ return res.get(0);
+ }
+ return null;
+ }
+
+
+ protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) {
+ int phase = currentPhase.getAndIncrement();
+ String startPhase = getOption(args, "--startPhase");
+ String endPhase = getOption(args, "--endPhase");
+ boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase))
+ || (endPhase != null && phase > Integer.parseInt(endPhase));
+ if (phaseSkipped) {
+ log.info("Skipping phase {}", phase);
+ }
+ return !phaseSkipped;
+ }
+
+ protected Job prepareJob(Path inputPath,
+ Path outputPath,
+ Class<? extends InputFormat> inputFormat,
+ Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey,
+ Class<? extends Writable> mapperValue,
+ Class<? extends OutputFormat> outputFormat) throws IOException {
+ return prepareJob(inputPath, outputPath, inputFormat, mapper, mapperKey, mapperValue, outputFormat, null);
+
+ }
+ protected Job prepareJob(Path inputPath,
+ Path outputPath,
+ Class<? extends InputFormat> inputFormat,
+ Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey,
+ Class<? extends Writable> mapperValue,
+ Class<? extends OutputFormat> outputFormat,
+ String jobname) throws IOException {
+
+ Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+ inputFormat, mapper, mapperKey, mapperValue, outputFormat, getConf());
+
+ String name =
+ jobname != null ? jobname : HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class);
+
+ job.setJobName(name);
+ return job;
+
+ }
+
+ protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,
+ Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException {
+ return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer,
+ reducerKey, reducerValue, SequenceFileOutputFormat.class);
+ }
+
+ protected Job prepareJob(Path inputPath,
+ Path outputPath,
+ Class<? extends InputFormat> inputFormat,
+ Class<? extends Mapper> mapper,
+ Class<? extends Writable> mapperKey,
+ Class<? extends Writable> mapperValue,
+ Class<? extends Reducer> reducer,
+ Class<? extends Writable> reducerKey,
+ Class<? extends Writable> reducerValue,
+ Class<? extends OutputFormat> outputFormat) throws IOException {
+ Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+ inputFormat, mapper, mapperKey, mapperValue, reducer, reducerKey, reducerValue, outputFormat, getConf());
+ job.setJobName(HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class));
+ return job;
+ }
+
+ /**
+ * necessary to make this job (having a combined input path) work on Amazon S3, hopefully this is
+ * obsolete when MultipleInputs is available again
+ */
+ public static void setS3SafeCombinedInputPath(Job job, Path referencePath, Path inputPathOne, Path inputPathTwo)
+ throws IOException {
+ FileSystem fs = FileSystem.get(referencePath.toUri(), job.getConfiguration());
+ FileInputFormat.setInputPaths(job, inputPathOne.makeQualified(fs), inputPathTwo.makeQualified(fs));
+ }
+
+ protected Class<? extends Analyzer> getAnalyzerClassFromOption() throws ClassNotFoundException {
+ Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
+ if (hasOption(DefaultOptionCreator.ANALYZER_NAME_OPTION)) {
+ String className = getOption(DefaultOptionCreator.ANALYZER_NAME_OPTION);
+ analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+ // try instantiating it, b/c there isn't any point in setting it if
+ // you can't instantiate it
+ //ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+ AnalyzerUtils.createAnalyzer(analyzerClass);
+ }
+ return analyzerClass;
+ }
+
+ /**
+ * Overrides the base implementation to install the Oozie action configuration resource
+ * into the provided Configuration object; note that ToolRunner calls setConf on the Tool
+ * before it invokes run.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+
+ // If running in an Oozie workflow as a Java action, need to add the
+ // Configuration resource provided by Oozie to this job's config.
+ String oozieActionConfXml = System.getProperty("oozie.action.conf.xml");
+ if (oozieActionConfXml != null && conf != null) {
+ conf.addResource(new Path("file:///", oozieActionConfXml));
+ log.info("Added Oozie action Configuration resource {} to the Hadoop Configuration", oozieActionConfXml);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/ClassUtils.java b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
new file mode 100644
index 0000000..8052ef1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.lang.reflect.InvocationTargetException;
+
+public final class ClassUtils {
+
+ private ClassUtils() {}
+
+ public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) {
+ try {
+ return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass, Class<?>[] params, Object[] args) {
+ try {
+ return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass, params, args);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static <T> T instantiateAs(Class<? extends T> clazz,
+ Class<T> asSubclassOfClass,
+ Class<?>[] params,
+ Object[] args) {
+ try {
+ return clazz.asSubclass(asSubclassOfClass).getConstructor(params).newInstance(args);
+ } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) {
+ throw new IllegalStateException(ie);
+ }
+ }
+
+
+ public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) {
+ try {
+ return clazz.asSubclass(asSubclassOfClass).getConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) {
+ throw new IllegalStateException(ie);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
new file mode 100644
index 0000000..0cc93ba
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import com.google.common.base.Charsets;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public final class CommandLineUtil {
+
+ private CommandLineUtil() { }
+
+ public static void printHelp(Group group) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setGroup(group);
+ formatter.print();
+ }
+
+ /**
+ * Print the options supported by {@code GenericOptionsParser}.
+ * In addition to the options supported by the job, passed in as the
+ * group parameter.
+ *
+ * @param group job-specific command-line options.
+ */
+ public static void printHelpWithGenericOptions(Group group) throws IOException {
+ new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]);
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setGroup(group);
+ formatter.setPrintWriter(pw);
+ formatter.setFooter("Specify HDFS directories while running on hadoop; else specify local file system directories");
+ formatter.print();
+ }
+
+ public static void printHelpWithGenericOptions(Group group, OptionException oe) throws IOException {
+ new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]);
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setGroup(group);
+ formatter.setPrintWriter(pw);
+ formatter.setException(oe);
+ formatter.print();
+ }
+
+}