You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:52:12 UTC
[44/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInput.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
new file mode 100644
index 0000000..6178f80
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.BitSet;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.apache.mahout.math.jet.random.sampling.RandomSampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility for splitting files in the input format used by the Bayes
+ * classifiers or anything else that has one item per line or SequenceFiles (key/value)
+ * into training and test sets in order to perform cross-validation.
+ * <p/>
+ * <p/>
+ * This class can be used to split directories of files or individual files into
+ * training and test sets using a number of different methods.
+ * <p/>
+ * When executed via {@link #splitDirectory(Path)} or {@link #splitFile(Path)},
+ * the lines read from one or more, input files are written to files of the same
+ * name into the directories specified by the
+ * {@link #setTestOutputDirectory(Path)} and
+ * {@link #setTrainingOutputDirectory(Path)} methods.
+ * <p/>
+ * The composition of the test set is determined using one of the following
+ * approaches:
+ * <ul>
+ * <li>A contiguous set of items can be chosen from the input file(s) using the
+ * {@link #setTestSplitSize(int)} or {@link #setTestSplitPct(int)} methods.
+ * {@link #setTestSplitSize(int)} allocates a fixed number of items, while
+ * {@link #setTestSplitPct(int)} allocates a percentage of the original input,
+ * rounded up to the nearest integer. {@link #setSplitLocation(int)} is used to
+ * control the position in the input from which the test data is extracted and
+ * is described further below.</li>
+ * <li>A random sampling of items can be chosen from the input files(s) using
+ * the {@link #setTestRandomSelectionSize(int)} or
+ * {@link #setTestRandomSelectionPct(int)} methods, each choosing a fixed test
+ * set size or percentage of the input set size as described above. The
+ * {@link RandomSampler} class from {@code mahout-math} is used to create a sample
+ * of the appropriate size.</li>
+ * </ul>
+ * <p/>
+ * Any one of the methods above can be used to control the size of the test set.
+ * If multiple methods are called, a runtime exception will be thrown at
+ * execution time.
+ * <p/>
+ * The {@link #setSplitLocation(int)} method is passed an integer from 0 to 100
+ * (inclusive) which is translated into the position of the start of the test
+ * data within the input file.
+ * <p/>
+ * Given:
+ * <ul>
+ * <li>an input file of 1500 lines</li>
+ * <li>a desired test data size of 10 percent</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * <li>A split location of 0 will cause the first 150 items appearing in the
+ * input set to be written to the test set.</li>
+ * <li>A split location of 25 will cause items 375-525 to be written to the test
+ * set.</li>
+ * <li>A split location of 100 will cause the last 150 items in the input to be
+ * written to the test set</li>
+ * </ul>
+ * The start of the split will always be adjusted forwards in order to ensure
+ * that the desired test set size is allocated. Split location has no effect is
+ * random sampling is employed.
+ */
+public class SplitInput extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(SplitInput.class);
+
+ private int testSplitSize = -1;
+ private int testSplitPct = -1;
+ private int splitLocation = 100;
+ private int testRandomSelectionSize = -1;
+ private int testRandomSelectionPct = -1;
+ private int keepPct = 100;
+ private Charset charset = Charsets.UTF_8;
+ private boolean useSequence;
+ private boolean useMapRed;
+
+ private Path inputDirectory;
+ private Path trainingOutputDirectory;
+ private Path testOutputDirectory;
+ private Path mapRedOutputDirectory;
+
+ private SplitCallback callback;
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (parseArgs(args)) {
+ splitDirectory();
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new SplitInput(), args);
+ }
+
+ /**
+ * Configure this instance based on the command-line arguments contained within provided array.
+ * Calls {@link #validate()} to ensure consistency of configuration.
+ *
+ * @return true if the arguments were parsed successfully and execution should proceed.
+ * @throws Exception if there is a problem parsing the command-line arguments or the particular
+ * combination would violate class invariants.
+ */
+ private boolean parseArgs(String[] args) throws Exception {
+
+ addInputOption();
+ addOption("trainingOutput", "tr", "The training data output directory", false);
+ addOption("testOutput", "te", "The test data output directory", false);
+ addOption("testSplitSize", "ss", "The number of documents held back as test data for each category", false);
+ addOption("testSplitPct", "sp", "The % of documents held back as test data for each category", false);
+ addOption("splitLocation", "sl", "Location for start of test data expressed as a percentage of the input file "
+ + "size (0=start, 50=middle, 100=end", false);
+ addOption("randomSelectionSize", "rs", "The number of items to be randomly selected as test data ", false);
+ addOption("randomSelectionPct", "rp", "Percentage of items to be randomly selected as test data when using "
+ + "mapreduce mode", false);
+ addOption("charset", "c", "The name of the character encoding of the input files (not needed if using "
+ + "SequenceFiles)", false);
+ addOption(buildOption("sequenceFiles", "seq", "Set if the input files are sequence files. Default is false",
+ false, false, "false"));
+ addOption(DefaultOptionCreator.methodOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ //TODO: extend this to sequential mode
+ addOption("keepPct", "k", "The percentage of total data to keep in map-reduce mode, the rest will be ignored. "
+ + "Default is 100%", false);
+ addOption("mapRedOutputDir", "mro", "Output directory for map reduce jobs", false);
+
+ if (parseArguments(args) == null) {
+ return false;
+ }
+
+ try {
+ inputDirectory = getInputPath();
+
+ useMapRed = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.MAPREDUCE_METHOD);
+
+ if (useMapRed) {
+ if (!hasOption("randomSelectionPct")) {
+ throw new OptionException(getCLIOption("randomSelectionPct"),
+ "must set randomSelectionPct when mapRed option is used");
+ }
+ if (!hasOption("mapRedOutputDir")) {
+ throw new OptionException(getCLIOption("mapRedOutputDir"),
+ "mapRedOutputDir must be set when mapRed option is used");
+ }
+ mapRedOutputDirectory = new Path(getOption("mapRedOutputDir"));
+ if (hasOption("keepPct")) {
+ keepPct = Integer.parseInt(getOption("keepPct"));
+ }
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), mapRedOutputDirectory);
+ }
+ } else {
+ if (!hasOption("trainingOutput")
+ || !hasOption("testOutput")) {
+ throw new OptionException(getCLIOption("trainingOutput"),
+ "trainingOutput and testOutput must be set if mapRed option is not used");
+ }
+ if (!hasOption("testSplitSize")
+ && !hasOption("testSplitPct")
+ && !hasOption("randomSelectionPct")
+ && !hasOption("randomSelectionSize")) {
+ throw new OptionException(getCLIOption("testSplitSize"),
+ "must set one of test split size/percentage or randomSelectionSize/percentage");
+ }
+
+ trainingOutputDirectory = new Path(getOption("trainingOutput"));
+ testOutputDirectory = new Path(getOption("testOutput"));
+ FileSystem fs = trainingOutputDirectory.getFileSystem(getConf());
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(fs.getConf(), trainingOutputDirectory);
+ HadoopUtil.delete(fs.getConf(), testOutputDirectory);
+ }
+ fs.mkdirs(trainingOutputDirectory);
+ fs.mkdirs(testOutputDirectory);
+ }
+
+ if (hasOption("charset")) {
+ charset = Charset.forName(getOption("charset"));
+ }
+
+ if (hasOption("testSplitSize") && hasOption("testSplitPct")) {
+ throw new OptionException(getCLIOption("testSplitPct"), "must have either split size or split percentage "
+ + "option, not BOTH");
+ }
+
+ if (hasOption("testSplitSize")) {
+ setTestSplitSize(Integer.parseInt(getOption("testSplitSize")));
+ }
+
+ if (hasOption("testSplitPct")) {
+ setTestSplitPct(Integer.parseInt(getOption("testSplitPct")));
+ }
+
+ if (hasOption("splitLocation")) {
+ setSplitLocation(Integer.parseInt(getOption("splitLocation")));
+ }
+
+ if (hasOption("randomSelectionSize")) {
+ setTestRandomSelectionSize(Integer.parseInt(getOption("randomSelectionSize")));
+ }
+
+ if (hasOption("randomSelectionPct")) {
+ setTestRandomSelectionPct(Integer.parseInt(getOption("randomSelectionPct")));
+ }
+
+ useSequence = hasOption("sequenceFiles");
+
+ } catch (OptionException e) {
+ log.error("Command-line option Exception", e);
+ CommandLineUtil.printHelp(getGroup());
+ return false;
+ }
+
+ validate();
+ return true;
+ }
+
+ /**
+ * Perform a split on directory specified by {@link #setInputDirectory(Path)} by calling {@link #splitFile(Path)}
+ * on each file found within that directory.
+ */
+ public void splitDirectory() throws IOException, ClassNotFoundException, InterruptedException {
+ this.splitDirectory(inputDirectory);
+ }
+
+ /**
+ * Perform a split on the specified directory by calling {@link #splitFile(Path)} on each file found within that
+ * directory.
+ */
+ public void splitDirectory(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
+ Configuration conf = getConf();
+ splitDirectory(conf, inputDir);
+ }
+
+ /*
+ * See also splitDirectory(Path inputDir)
+ * */
+ public void splitDirectory(Configuration conf, Path inputDir)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+ if (fs.getFileStatus(inputDir) == null) {
+ throw new IOException(inputDir + " does not exist");
+ }
+ if (!fs.getFileStatus(inputDir).isDir()) {
+ throw new IOException(inputDir + " is not a directory");
+ }
+
+ if (useMapRed) {
+ SplitInputJob.run(conf, inputDir, mapRedOutputDirectory,
+ keepPct, testRandomSelectionPct);
+ } else {
+ // input dir contains one file per category.
+ FileStatus[] fileStats = fs.listStatus(inputDir, PathFilters.logsCRCFilter());
+ for (FileStatus inputFile : fileStats) {
+ if (!inputFile.isDir()) {
+ splitFile(inputFile.getPath());
+ }
+ }
+ }
+ }
+
+ /**
+ * Perform a split on the specified input file. Results will be written to files of the same name in the specified
+ * training and test output directories. The {@link #validate()} method is called prior to executing the split.
+ */
+ public void splitFile(Path inputFile) throws IOException {
+ Configuration conf = getConf();
+ FileSystem fs = inputFile.getFileSystem(conf);
+ if (fs.getFileStatus(inputFile) == null) {
+ throw new IOException(inputFile + " does not exist");
+ }
+ if (fs.getFileStatus(inputFile).isDir()) {
+ throw new IOException(inputFile + " is a directory");
+ }
+
+ validate();
+
+ Path testOutputFile = new Path(testOutputDirectory, inputFile.getName());
+ Path trainingOutputFile = new Path(trainingOutputDirectory, inputFile.getName());
+
+ int lineCount = countLines(fs, inputFile, charset);
+
+ log.info("{} has {} lines", inputFile.getName(), lineCount);
+
+ int testSplitStart = 0;
+ int testSplitSize = this.testSplitSize; // don't modify state
+ BitSet randomSel = null;
+
+ if (testRandomSelectionPct > 0 || testRandomSelectionSize > 0) {
+ testSplitSize = this.testRandomSelectionSize;
+
+ if (testRandomSelectionPct > 0) {
+ testSplitSize = Math.round(lineCount * testRandomSelectionPct / 100.0f);
+ }
+ log.info("{} test split size is {} based on random selection percentage {}",
+ inputFile.getName(), testSplitSize, testRandomSelectionPct);
+ long[] ridx = new long[testSplitSize];
+ RandomSampler.sample(testSplitSize, lineCount - 1, testSplitSize, 0, ridx, 0, RandomUtils.getRandom());
+ randomSel = new BitSet(lineCount);
+ for (long idx : ridx) {
+ randomSel.set((int) idx + 1);
+ }
+ } else {
+ if (testSplitPct > 0) { // calculate split size based on percentage
+ testSplitSize = Math.round(lineCount * testSplitPct / 100.0f);
+ log.info("{} test split size is {} based on percentage {}",
+ inputFile.getName(), testSplitSize, testSplitPct);
+ } else {
+ log.info("{} test split size is {}", inputFile.getName(), testSplitSize);
+ }
+
+ if (splitLocation > 0) { // calculate start of split based on percentage
+ testSplitStart = Math.round(lineCount * splitLocation / 100.0f);
+ if (lineCount - testSplitStart < testSplitSize) {
+ // adjust split start downwards based on split size.
+ testSplitStart = lineCount - testSplitSize;
+ }
+ log.info("{} test split start is {} based on split location {}",
+ inputFile.getName(), testSplitStart, splitLocation);
+ }
+
+ if (testSplitStart < 0) {
+ throw new IllegalArgumentException("test split size for " + inputFile + " is too large, it would produce an "
+ + "empty training set from the initial set of " + lineCount + " examples");
+ } else if (lineCount - testSplitSize < testSplitSize) {
+ log.warn("Test set size for {} may be too large, {} is larger than the number of "
+ + "lines remaining in the training set: {}",
+ inputFile, testSplitSize, lineCount - testSplitSize);
+ }
+ }
+ int trainCount = 0;
+ int testCount = 0;
+ if (!useSequence) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset));
+ Writer trainingWriter = new OutputStreamWriter(fs.create(trainingOutputFile), charset);
+ Writer testWriter = new OutputStreamWriter(fs.create(testOutputFile), charset)){
+
+ String line;
+ int pos = 0;
+ while ((line = reader.readLine()) != null) {
+ pos++;
+
+ Writer writer;
+ if (testRandomSelectionPct > 0) { // Randomly choose
+ writer = randomSel.get(pos) ? testWriter : trainingWriter;
+ } else { // Choose based on location
+ writer = pos > testSplitStart ? testWriter : trainingWriter;
+ }
+
+ if (writer == testWriter) {
+ if (testCount >= testSplitSize) {
+ writer = trainingWriter;
+ } else {
+ testCount++;
+ }
+ }
+ if (writer == trainingWriter) {
+ trainCount++;
+ }
+ writer.write(line);
+ writer.write('\n');
+ }
+
+ }
+ } else {
+ try (SequenceFileIterator<Writable, Writable> iterator =
+ new SequenceFileIterator<>(inputFile, false, fs.getConf());
+ SequenceFile.Writer trainingWriter = SequenceFile.createWriter(fs, fs.getConf(), trainingOutputFile,
+ iterator.getKeyClass(), iterator.getValueClass());
+ SequenceFile.Writer testWriter = SequenceFile.createWriter(fs, fs.getConf(), testOutputFile,
+ iterator.getKeyClass(), iterator.getValueClass())) {
+
+ int pos = 0;
+ while (iterator.hasNext()) {
+ pos++;
+ SequenceFile.Writer writer;
+ if (testRandomSelectionPct > 0) { // Randomly choose
+ writer = randomSel.get(pos) ? testWriter : trainingWriter;
+ } else { // Choose based on location
+ writer = pos > testSplitStart ? testWriter : trainingWriter;
+ }
+
+ if (writer == testWriter) {
+ if (testCount >= testSplitSize) {
+ writer = trainingWriter;
+ } else {
+ testCount++;
+ }
+ }
+ if (writer == trainingWriter) {
+ trainCount++;
+ }
+ Pair<Writable, Writable> pair = iterator.next();
+ writer.append(pair.getFirst(), pair.getSecond());
+ }
+
+ }
+ }
+ log.info("file: {}, input: {} train: {}, test: {} starting at {}",
+ inputFile.getName(), lineCount, trainCount, testCount, testSplitStart);
+
+ // testing;
+ if (callback != null) {
+ callback.splitComplete(inputFile, lineCount, trainCount, testCount, testSplitStart);
+ }
+ }
+
+ public int getTestSplitSize() {
+ return testSplitSize;
+ }
+
+ public void setTestSplitSize(int testSplitSize) {
+ this.testSplitSize = testSplitSize;
+ }
+
+ public int getTestSplitPct() {
+ return testSplitPct;
+ }
+
+ /**
+ * Sets the percentage of the input data to allocate to the test split
+ *
+ * @param testSplitPct a value between 0 and 100 inclusive.
+ */
+ public void setTestSplitPct(int testSplitPct) {
+ this.testSplitPct = testSplitPct;
+ }
+
+ /**
+ * Sets the percentage of the input data to keep in a map reduce split input job
+ *
+ * @param keepPct a value between 0 and 100 inclusive.
+ */
+ public void setKeepPct(int keepPct) {
+ this.keepPct = keepPct;
+ }
+
+ /**
+ * Set to true to use map reduce to split the input
+ *
+ * @param useMapRed a boolean to indicate whether map reduce should be used
+ */
+ public void setUseMapRed(boolean useMapRed) {
+ this.useMapRed = useMapRed;
+ }
+
+ public void setMapRedOutputDirectory(Path mapRedOutputDirectory) {
+ this.mapRedOutputDirectory = mapRedOutputDirectory;
+ }
+
+ public int getSplitLocation() {
+ return splitLocation;
+ }
+
+ /**
+ * Set the location of the start of the test/training data split. Expressed as percentage of lines, for example
+ * 0 indicates that the test data should be taken from the start of the file, 100 indicates that the test data
+ * should be taken from the end of the input file, while 25 indicates that the test data should be taken from the
+ * first quarter of the file.
+ * <p/>
+ * This option is only relevant in cases where random selection is not employed
+ *
+ * @param splitLocation a value between 0 and 100 inclusive.
+ */
+ public void setSplitLocation(int splitLocation) {
+ this.splitLocation = splitLocation;
+ }
+
+ public Charset getCharset() {
+ return charset;
+ }
+
+ /**
+ * Set the charset used to read and write files
+ */
+ public void setCharset(Charset charset) {
+ this.charset = charset;
+ }
+
+ public Path getInputDirectory() {
+ return inputDirectory;
+ }
+
+ /**
+ * Set the directory from which input data will be read when the the {@link #splitDirectory()} method is invoked
+ */
+ public void setInputDirectory(Path inputDir) {
+ this.inputDirectory = inputDir;
+ }
+
+ public Path getTrainingOutputDirectory() {
+ return trainingOutputDirectory;
+ }
+
+ /**
+ * Set the directory to which training data will be written.
+ */
+ public void setTrainingOutputDirectory(Path trainingOutputDir) {
+ this.trainingOutputDirectory = trainingOutputDir;
+ }
+
+ public Path getTestOutputDirectory() {
+ return testOutputDirectory;
+ }
+
+ /**
+ * Set the directory to which test data will be written.
+ */
+ public void setTestOutputDirectory(Path testOutputDir) {
+ this.testOutputDirectory = testOutputDir;
+ }
+
+ public SplitCallback getCallback() {
+ return callback;
+ }
+
+ /**
+ * Sets the callback used to inform the caller that an input file has been successfully split
+ */
+ public void setCallback(SplitCallback callback) {
+ this.callback = callback;
+ }
+
+ public int getTestRandomSelectionSize() {
+ return testRandomSelectionSize;
+ }
+
+ /**
+ * Sets number of random input samples that will be saved to the test set.
+ */
+ public void setTestRandomSelectionSize(int testRandomSelectionSize) {
+ this.testRandomSelectionSize = testRandomSelectionSize;
+ }
+
+ public int getTestRandomSelectionPct() {
+
+ return testRandomSelectionPct;
+ }
+
+ /**
+ * Sets number of random input samples that will be saved to the test set as a percentage of the size of the
+ * input set.
+ *
+ * @param randomSelectionPct a value between 0 and 100 inclusive.
+ */
+ public void setTestRandomSelectionPct(int randomSelectionPct) {
+ this.testRandomSelectionPct = randomSelectionPct;
+ }
+
+ /**
+ * Validates that the current instance is in a consistent state
+ *
+ * @throws IllegalArgumentException if settings violate class invariants.
+ * @throws IOException if output directories do not exist or are not directories.
+ */
+ public void validate() throws IOException {
+ Preconditions.checkArgument(testSplitSize >= 1 || testSplitSize == -1,
+ "Invalid testSplitSize: " + testSplitSize + ". Must be: testSplitSize >= 1 or testSplitSize = -1");
+ Preconditions.checkArgument(splitLocation >= 0 && splitLocation <= 100 || splitLocation == -1,
+ "Invalid splitLocation percentage: " + splitLocation + ". Must be: 0 <= splitLocation <= 100 or splitLocation = -1");
+ Preconditions.checkArgument(testSplitPct >= 0 && testSplitPct <= 100 || testSplitPct == -1,
+ "Invalid testSplitPct percentage: " + testSplitPct + ". Must be: 0 <= testSplitPct <= 100 or testSplitPct = -1");
+ Preconditions.checkArgument(testRandomSelectionPct >= 0 && testRandomSelectionPct <= 100
+ || testRandomSelectionPct == -1,"Invalid testRandomSelectionPct percentage: " + testRandomSelectionPct +
+ ". Must be: 0 <= testRandomSelectionPct <= 100 or testRandomSelectionPct = -1");
+
+ Preconditions.checkArgument(trainingOutputDirectory != null || useMapRed,
+ "No training output directory was specified");
+ Preconditions.checkArgument(testOutputDirectory != null || useMapRed, "No test output directory was specified");
+
+ // only one of the following may be set, one must be set.
+ int count = 0;
+ if (testSplitSize > 0) {
+ count++;
+ }
+ if (testSplitPct > 0) {
+ count++;
+ }
+ if (testRandomSelectionSize > 0) {
+ count++;
+ }
+ if (testRandomSelectionPct > 0) {
+ count++;
+ }
+
+ Preconditions.checkArgument(count == 1, "Exactly one of testSplitSize, testSplitPct, testRandomSelectionSize, "
+ + "testRandomSelectionPct should be set");
+
+ if (!useMapRed) {
+ Configuration conf = getConf();
+ FileSystem fs = trainingOutputDirectory.getFileSystem(conf);
+ FileStatus trainingOutputDirStatus = fs.getFileStatus(trainingOutputDirectory);
+ Preconditions.checkArgument(trainingOutputDirStatus != null && trainingOutputDirStatus.isDir(),
+ "%s is not a directory", trainingOutputDirectory);
+ FileStatus testOutputDirStatus = fs.getFileStatus(testOutputDirectory);
+ Preconditions.checkArgument(testOutputDirStatus != null && testOutputDirStatus.isDir(),
+ "%s is not a directory", testOutputDirectory);
+ }
+ }
+
+ /**
+ * Count the lines in the file specified as returned by {@code BufferedReader.readLine()}
+ *
+ * @param inputFile the file whose lines will be counted
+ * @param charset the charset of the file to read
+ * @return the number of lines in the input file.
+ * @throws IOException if there is a problem opening or reading the file.
+ */
+ public static int countLines(FileSystem fs, Path inputFile, Charset charset) throws IOException {
+ int lineCount = 0;
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset))){
+ while (reader.readLine() != null) {
+ lineCount++;
+ }
+ }
+ return lineCount;
+ }
+
+ /**
+ * Used to pass information back to a caller once a file has been split without the need for a data object
+ */
+ public interface SplitCallback {
+ void splitComplete(Path inputFile, int lineCount, int trainCount, int testCount, int testSplitStart);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
new file mode 100644
index 0000000..4a1ff86
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+
+/**
+ * Class which implements a map reduce version of SplitInput.
+ * This class takes a SequenceFile input, e.g. a set of training data
+ * for a learning algorithm, downsamples it, applies a random
+ * permutation and splits it into test and training sets
+ */
+public final class SplitInputJob {
+
+ private static final String DOWNSAMPLING_FACTOR = "SplitInputJob.downsamplingFactor";
+ private static final String RANDOM_SELECTION_PCT = "SplitInputJob.randomSelectionPct";
+ private static final String TRAINING_TAG = "training";
+ private static final String TEST_TAG = "test";
+
+ private SplitInputJob() {}
+
+ /**
+ * Run job to downsample, randomly permute and split data into test and
+ * training sets. This job takes a SequenceFile as input and outputs two
+ * SequenceFiles test-r-00000 and training-r-00000 which contain the test and
+ * training sets respectively
+ *
+ * @param initialConf
+ * Initial configuration
+ * @param inputPath
+ * path to input data SequenceFile
+ * @param outputPath
+ * path for output data SequenceFiles
+ * @param keepPct
+ * percentage of key value pairs in input to keep. The rest are
+ * discarded
+ * @param randomSelectionPercent
+ * percentage of key value pairs to allocate to test set. Remainder
+ * are allocated to training set
+ */
+ @SuppressWarnings("rawtypes")
+ public static void run(Configuration initialConf, Path inputPath,
+ Path outputPath, int keepPct, float randomSelectionPercent)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ int downsamplingFactor = (int) (100.0 / keepPct);
+ initialConf.setInt(DOWNSAMPLING_FACTOR, downsamplingFactor);
+ initialConf.setFloat(RANDOM_SELECTION_PCT, randomSelectionPercent);
+
+ // Determine class of keys and values
+ FileSystem fs = FileSystem.get(initialConf);
+
+ SequenceFileDirIterator<? extends WritableComparable, Writable> iterator =
+ new SequenceFileDirIterator<>(inputPath,
+ PathType.LIST, PathFilters.partFilter(), null, false, fs.getConf());
+ Class<? extends WritableComparable> keyClass;
+ Class<? extends Writable> valueClass;
+ if (iterator.hasNext()) {
+ Pair<? extends WritableComparable, Writable> pair = iterator.next();
+ keyClass = pair.getFirst().getClass();
+ valueClass = pair.getSecond().getClass();
+ } else {
+ throw new IllegalStateException("Couldn't determine class of the input values");
+ }
+
+ Job job = new Job(new Configuration(initialConf));
+
+ MultipleOutputs.addNamedOutput(job, TRAINING_TAG, SequenceFileOutputFormat.class, keyClass, valueClass);
+ MultipleOutputs.addNamedOutput(job, TEST_TAG, SequenceFileOutputFormat.class, keyClass, valueClass);
+ job.setJarByClass(SplitInputJob.class);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ job.setNumReduceTasks(1);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(SplitInputMapper.class);
+ job.setReducerClass(SplitInputReducer.class);
+ job.setSortComparatorClass(SplitInputComparator.class);
+ job.setOutputKeyClass(keyClass);
+ job.setOutputValueClass(valueClass);
+ job.submit();
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+
+ /** Mapper which downsamples the input by downsamplingFactor */
+ public static class SplitInputMapper extends
+ Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+ private int downsamplingFactor;
+
+ @Override
+ public void setup(Context ctx) {
+ downsamplingFactor = ctx.getConfiguration().getInt(DOWNSAMPLING_FACTOR, 1);
+ }
+
+ /** Only run map() for one out of every downsampleFactor inputs */
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ setup(context);
+ int i = 0;
+ while (context.nextKeyValue()) {
+ if (i % downsamplingFactor == 0) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ }
+ i++;
+ }
+ cleanup(context);
+ }
+
+ }
+
+ /** Reducer which uses MultipleOutputs to randomly allocate key value pairs between test and training outputs */
+ public static class SplitInputReducer extends
+ Reducer<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+ private MultipleOutputs multipleOutputs;
+ private final Random rnd = RandomUtils.getRandom();
+ private float randomSelectionPercent;
+
+ @Override
+ protected void setup(Context ctx) throws IOException {
+ randomSelectionPercent = ctx.getConfiguration().getFloat(RANDOM_SELECTION_PCT, 0);
+ multipleOutputs = new MultipleOutputs(ctx);
+ }
+
+ /**
+ * Randomly allocate key value pairs between test and training sets.
+ * randomSelectionPercent of the pairs will go to the test set.
+ */
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<Writable> values,
+ Context context) throws IOException, InterruptedException {
+ for (Writable value : values) {
+ if (rnd.nextInt(100) < randomSelectionPercent) {
+ multipleOutputs.write(TEST_TAG, key, value);
+ } else {
+ multipleOutputs.write(TRAINING_TAG, key, value);
+ }
+ }
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ try {
+ multipleOutputs.close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ }
+
+ /** Randomly permute key value pairs */
+ public static class SplitInputComparator extends WritableComparator implements Serializable {
+
+ private final Random rnd = RandomUtils.getRandom();
+
+ protected SplitInputComparator() {
+ super(WritableComparable.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ if (rnd.nextBoolean()) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
new file mode 100644
index 0000000..ac884d0
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for implementing ClusterWriter
+ */
+public abstract class AbstractClusterWriter implements ClusterWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractClusterWriter.class);
+
+ protected final Writer writer;
+ protected final Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints;
+ protected final DistanceMeasure measure;
+
+ /**
+ *
+ * @param writer The underlying {@link java.io.Writer} to use
+ * @param clusterIdToPoints The map between cluster ids {@link org.apache.mahout.clustering.Cluster#getId()} and the
+ * points in the cluster
+ * @param measure The {@link org.apache.mahout.common.distance.DistanceMeasure} used to calculate the distance.
+ * Some writers may wish to use it for calculating weights for display. May be null.
+ */
+ protected AbstractClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
+ DistanceMeasure measure) {
+ this.writer = writer;
+ this.clusterIdToPoints = clusterIdToPoints;
+ this.measure = measure;
+ }
+
+ protected Writer getWriter() {
+ return writer;
+ }
+
+ protected Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() {
+ return clusterIdToPoints;
+ }
+
+ public static String getTopFeatures(Vector vector, String[] dictionary, int numTerms) {
+
+ StringBuilder sb = new StringBuilder(100);
+
+ for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) {
+ String term = item.getFirst();
+ sb.append("\n\t\t");
+ sb.append(StringUtils.rightPad(term, 40));
+ sb.append("=>");
+ sb.append(StringUtils.leftPad(item.getSecond().toString(), 20));
+ }
+ return sb.toString();
+ }
+
+ public static String getTopTerms(Vector vector, String[] dictionary, int numTerms) {
+
+ StringBuilder sb = new StringBuilder(100);
+
+ for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) {
+ String term = item.getFirst();
+ sb.append(term).append('_');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ return sb.toString();
+ }
+
+ @Override
+ public long write(Iterable<ClusterWritable> iterable) throws IOException {
+ return write(iterable, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ @Override
+ public long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException {
+ long result = 0;
+ Iterator<ClusterWritable> iterator = iterable.iterator();
+ while (result < maxDocs && iterator.hasNext()) {
+ write(iterator.next());
+ result++;
+ }
+ return result;
+ }
+
+ private static Collection<Pair<String, Double>> getTopPairs(Vector vector, String[] dictionary, int numTerms) {
+ List<TermIndexWeight> vectorTerms = Lists.newArrayList();
+
+ for (Vector.Element elt : vector.nonZeroes()) {
+ vectorTerms.add(new TermIndexWeight(elt.index(), elt.get()));
+ }
+
+ // Sort results in reverse order (ie weight in descending order)
+ Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() {
+ @Override
+ public int compare(TermIndexWeight one, TermIndexWeight two) {
+ return Double.compare(two.weight, one.weight);
+ }
+ });
+
+ Collection<Pair<String, Double>> topTerms = Lists.newLinkedList();
+
+ for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) {
+ int index = vectorTerms.get(i).index;
+ String dictTerm = dictionary[index];
+ if (dictTerm == null) {
+ log.error("Dictionary entry missing for {}", index);
+ continue;
+ }
+ topTerms.add(new Pair<>(dictTerm, vectorTerms.get(i).weight));
+ }
+
+ return topTerms;
+ }
+
+ private static class TermIndexWeight {
+ private final int index;
+ private final double weight;
+
+ TermIndexWeight(int index, double weight) {
+ this.index = index;
+ this.weight = weight;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
new file mode 100644
index 0000000..7269016
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Format is adjacency style as put forth at http://gephi.org/users/supported-graph-formats/csv-format/, the centroid
+ * is the first element and all the rest of the row are the points in that cluster
+ *
+ **/
+public class CSVClusterWriter extends AbstractClusterWriter {
+
+ private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
+
+ public CSVClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
+ DistanceMeasure measure) {
+ super(writer, clusterIdToPoints, measure);
+ }
+
+ @Override
+ public void write(ClusterWritable clusterWritable) throws IOException {
+ StringBuilder line = new StringBuilder();
+ Cluster cluster = clusterWritable.getValue();
+ line.append(cluster.getId());
+ List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get(cluster.getId());
+ if (points != null) {
+ for (WeightedPropertyVectorWritable point : points) {
+ Vector theVec = point.getVector();
+ line.append(',');
+ if (theVec instanceof NamedVector) {
+ line.append(((NamedVector)theVec).getName());
+ } else {
+ String vecStr = theVec.asFormatString();
+ //do some basic manipulations for display
+ vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
+ line.append(vecStr);
+ }
+ }
+ getWriter().append(line).append("\n");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
new file mode 100644
index 0000000..75b5ded
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.clustering.cdbw.CDbwEvaluator;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.evaluation.ClusterEvaluator;
+import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.utils.vectors.VectorHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ClusterDumper extends AbstractJob {
+
+ public static final String SAMPLE_POINTS = "samplePoints";
+ DistanceMeasure measure;
+
+ public enum OUTPUT_FORMAT {
+ TEXT,
+ CSV,
+ GRAPH_ML,
+ JSON,
+ }
+
+ public static final String DICTIONARY_TYPE_OPTION = "dictionaryType";
+ public static final String DICTIONARY_OPTION = "dictionary";
+ public static final String POINTS_DIR_OPTION = "pointsDir";
+ public static final String NUM_WORDS_OPTION = "numWords";
+ public static final String SUBSTRING_OPTION = "substring";
+ public static final String EVALUATE_CLUSTERS = "evaluate";
+
+ public static final String OUTPUT_FORMAT_OPT = "outputFormat";
+
+ private static final Logger log = LoggerFactory.getLogger(ClusterDumper.class);
+ private Path seqFileDir;
+ private Path pointsDir;
+ private long maxPointsPerCluster = Long.MAX_VALUE;
+ private String termDictionary;
+ private String dictionaryFormat;
+ private int subString = Integer.MAX_VALUE;
+ private int numTopFeatures = 10;
+ private Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints;
+ private OUTPUT_FORMAT outputFormat = OUTPUT_FORMAT.TEXT;
+ private boolean runEvaluation;
+
+ public ClusterDumper(Path seqFileDir, Path pointsDir) {
+ this.seqFileDir = seqFileDir;
+ this.pointsDir = pointsDir;
+ init();
+ }
+
+ public ClusterDumper() {
+ setConf(new Configuration());
+ }
+
+ public static void main(String[] args) throws Exception {
+ new ClusterDumper().run(args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(OUTPUT_FORMAT_OPT, "of", "The optional output format for the results. Options: TEXT, CSV, JSON or GRAPH_ML",
+ "TEXT");
+ addOption(SUBSTRING_OPTION, "b", "The number of chars of the asFormatString() to print");
+ addOption(NUM_WORDS_OPTION, "n", "The number of top terms to print");
+ addOption(POINTS_DIR_OPTION, "p",
+ "The directory containing points sequence files mapping input vectors to their cluster. "
+ + "If specified, then the program will output the points associated with a cluster");
+ addOption(SAMPLE_POINTS, "sp", "Specifies the maximum number of points to include _per_ cluster. The default "
+ + "is to include all points");
+ addOption(DICTIONARY_OPTION, "d", "The dictionary file");
+ addOption(DICTIONARY_TYPE_OPTION, "dt", "The dictionary file type (text|sequencefile)", "text");
+ addOption(buildOption(EVALUATE_CLUSTERS, "e", "Run ClusterEvaluator and CDbwEvaluator over the input. "
+ + "The output will be appended to the rest of the output at the end.", false, false, null));
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+
+ // output is optional, will print to System.out per default
+ if (parseArguments(args, false, true) == null) {
+ return -1;
+ }
+
+ seqFileDir = getInputPath();
+ if (hasOption(POINTS_DIR_OPTION)) {
+ pointsDir = new Path(getOption(POINTS_DIR_OPTION));
+ }
+ outputFile = getOutputFile();
+ if (hasOption(SUBSTRING_OPTION)) {
+ int sub = Integer.parseInt(getOption(SUBSTRING_OPTION));
+ if (sub >= 0) {
+ subString = sub;
+ }
+ }
+ termDictionary = getOption(DICTIONARY_OPTION);
+ dictionaryFormat = getOption(DICTIONARY_TYPE_OPTION);
+ if (hasOption(NUM_WORDS_OPTION)) {
+ numTopFeatures = Integer.parseInt(getOption(NUM_WORDS_OPTION));
+ }
+ if (hasOption(OUTPUT_FORMAT_OPT)) {
+ outputFormat = OUTPUT_FORMAT.valueOf(getOption(OUTPUT_FORMAT_OPT));
+ }
+ if (hasOption(SAMPLE_POINTS)) {
+ maxPointsPerCluster = Long.parseLong(getOption(SAMPLE_POINTS));
+ } else {
+ maxPointsPerCluster = Long.MAX_VALUE;
+ }
+ runEvaluation = hasOption(EVALUATE_CLUSTERS);
+ String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class);
+
+ init();
+ printClusters(null);
+ return 0;
+ }
+
+ public void printClusters(String[] dictionary) throws Exception {
+ Configuration conf = new Configuration();
+
+ if (this.termDictionary != null) {
+ if ("text".equals(dictionaryFormat)) {
+ dictionary = VectorHelper.loadTermDictionary(new File(this.termDictionary));
+ } else if ("sequencefile".equals(dictionaryFormat)) {
+ dictionary = VectorHelper.loadTermDictionary(conf, this.termDictionary);
+ } else {
+ throw new IllegalArgumentException("Invalid dictionary format");
+ }
+ }
+
+ Writer writer;
+ boolean shouldClose;
+ if (this.outputFile == null) {
+ shouldClose = false;
+ writer = new OutputStreamWriter(System.out, Charsets.UTF_8);
+ } else {
+ shouldClose = true;
+ if (outputFile.getName().startsWith("s3n://")) {
+ Path p = outputPath;
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ writer = new OutputStreamWriter(fs.create(p), Charsets.UTF_8);
+ } else {
+ Files.createParentDirs(outputFile);
+ writer = Files.newWriter(this.outputFile, Charsets.UTF_8);
+ }
+ }
+ ClusterWriter clusterWriter = createClusterWriter(writer, dictionary);
+ try {
+ long numWritten = clusterWriter.write(new SequenceFileDirValueIterable<ClusterWritable>(new Path(seqFileDir,
+ "part-*"), PathType.GLOB, conf));
+
+ writer.flush();
+ if (runEvaluation) {
+ HadoopUtil.delete(conf, new Path("tmp/representative"));
+ int numIters = 5;
+ RepresentativePointsDriver.main(new String[]{
+ "--input", seqFileDir.toString(),
+ "--output", "tmp/representative",
+ "--clusteredPoints", pointsDir.toString(),
+ "--distanceMeasure", measure.getClass().getName(),
+ "--maxIter", String.valueOf(numIters)
+ });
+ conf.set(RepresentativePointsDriver.DISTANCE_MEASURE_KEY, measure.getClass().getName());
+ conf.set(RepresentativePointsDriver.STATE_IN_KEY, "tmp/representative/representativePoints-" + numIters);
+ ClusterEvaluator ce = new ClusterEvaluator(conf, seqFileDir);
+ writer.append("\n");
+ writer.append("Inter-Cluster Density: ").append(String.valueOf(ce.interClusterDensity())).append("\n");
+ writer.append("Intra-Cluster Density: ").append(String.valueOf(ce.intraClusterDensity())).append("\n");
+ CDbwEvaluator cdbw = new CDbwEvaluator(conf, seqFileDir);
+ writer.append("CDbw Inter-Cluster Density: ").append(String.valueOf(cdbw.interClusterDensity())).append("\n");
+ writer.append("CDbw Intra-Cluster Density: ").append(String.valueOf(cdbw.intraClusterDensity())).append("\n");
+ writer.append("CDbw Separation: ").append(String.valueOf(cdbw.separation())).append("\n");
+ writer.flush();
+ }
+ log.info("Wrote {} clusters", numWritten);
+ } finally {
+ if (shouldClose) {
+ Closeables.close(clusterWriter, false);
+ } else {
+ if (clusterWriter instanceof GraphMLClusterWriter) {
+ clusterWriter.close();
+ }
+ }
+ }
+ }
+
+ ClusterWriter createClusterWriter(Writer writer, String[] dictionary) throws IOException {
+ ClusterWriter result;
+
+ switch (outputFormat) {
+ case TEXT:
+ result = new ClusterDumperWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString);
+ break;
+ case CSV:
+ result = new CSVClusterWriter(writer, clusterIdToPoints, measure);
+ break;
+ case GRAPH_ML:
+ result = new GraphMLClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString);
+ break;
+ case JSON:
+ result = new JsonClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary);
+ break;
+ default:
+ throw new IllegalStateException("Unknown outputformat: " + outputFormat);
+ }
+ return result;
+ }
+
+ /**
+ * Convenience function to set the output format during testing.
+ */
+ public void setOutputFormat(OUTPUT_FORMAT of) {
+ outputFormat = of;
+ }
+
+ private void init() {
+ if (this.pointsDir != null) {
+ Configuration conf = new Configuration();
+ // read in the points
+ clusterIdToPoints = readPoints(this.pointsDir, maxPointsPerCluster, conf);
+ } else {
+ clusterIdToPoints = Collections.emptyMap();
+ }
+ }
+
+
+ public int getSubString() {
+ return subString;
+ }
+
+ public void setSubString(int subString) {
+ this.subString = subString;
+ }
+
+ public Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() {
+ return clusterIdToPoints;
+ }
+
+ public String getTermDictionary() {
+ return termDictionary;
+ }
+
+ public void setTermDictionary(String termDictionary, String dictionaryType) {
+ this.termDictionary = termDictionary;
+ this.dictionaryFormat = dictionaryType;
+ }
+
+ public void setNumTopFeatures(int num) {
+ this.numTopFeatures = num;
+ }
+
+ public int getNumTopFeatures() {
+ return this.numTopFeatures;
+ }
+
+ public long getMaxPointsPerCluster() {
+ return maxPointsPerCluster;
+ }
+
+ public void setMaxPointsPerCluster(long maxPointsPerCluster) {
+ this.maxPointsPerCluster = maxPointsPerCluster;
+ }
+
+ public static Map<Integer, List<WeightedPropertyVectorWritable>> readPoints(Path pointsPathDir,
+ long maxPointsPerCluster,
+ Configuration conf) {
+ Map<Integer, List<WeightedPropertyVectorWritable>> result = new TreeMap<>();
+ for (Pair<IntWritable, WeightedPropertyVectorWritable> record
+ : new SequenceFileDirIterable<IntWritable, WeightedPropertyVectorWritable>(pointsPathDir, PathType.LIST,
+ PathFilters.logsCRCFilter(), conf)) {
+ // value is the cluster id as an int, key is the name/id of the
+ // vector, but that doesn't matter because we only care about printing it
+ //String clusterId = value.toString();
+ int keyValue = record.getFirst().get();
+ List<WeightedPropertyVectorWritable> pointList = result.get(keyValue);
+ if (pointList == null) {
+ pointList = new ArrayList<>();
+ result.put(keyValue, pointList);
+ }
+ if (pointList.size() < maxPointsPerCluster) {
+ pointList.add(record.getSecond());
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
new file mode 100644
index 0000000..31858c4
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.distance.DistanceMeasure;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implements a {@link ClusterWriter} that outputs in the format used by ClusterDumper in Mahout 0.5
+ */
+public class ClusterDumperWriter extends AbstractClusterWriter {
+
+ private final int subString;
+ private final String[] dictionary;
+ private final int numTopFeatures;
+
+ public ClusterDumperWriter(Writer writer, Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints,
+ DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString) {
+ super(writer, clusterIdToPoints, measure);
+ this.numTopFeatures = numTopFeatures;
+ this.dictionary = dictionary;
+ this.subString = subString;
+ }
+
+ @Override
+ public void write(ClusterWritable clusterWritable) throws IOException {
+ Cluster cluster = clusterWritable.getValue();
+ String fmtStr = cluster.asFormatString(dictionary);
+ Writer writer = getWriter();
+ if (subString > 0 && fmtStr.length() > subString) {
+ writer.write(':');
+ writer.write(fmtStr, 0, Math.min(subString, fmtStr.length()));
+ } else {
+ writer.write(fmtStr);
+ }
+
+ writer.write('\n');
+
+ if (dictionary != null) {
+ String topTerms = getTopFeatures(clusterWritable.getValue().getCenter(), dictionary, numTopFeatures);
+ writer.write("\tTop Terms: ");
+ writer.write(topTerms);
+ writer.write('\n');
+ }
+
+ Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints = getClusterIdToPoints();
+ List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(clusterWritable.getValue().getId());
+ if (points != null) {
+ writer.write("\tWeight : [props - optional]: Point:\n\t");
+ for (Iterator<WeightedPropertyVectorWritable> iterator = points.iterator(); iterator.hasNext();) {
+ WeightedPropertyVectorWritable point = iterator.next();
+ writer.write(String.valueOf(point.getWeight()));
+ Map<Text,Text> map = point.getProperties();
+ // map can be null since empty maps when written are returned as null
+ writer.write(" : [");
+ if (map != null) {
+ for (Map.Entry<Text,Text> entry : map.entrySet()) {
+ writer.write(entry.getKey().toString());
+ writer.write("=");
+ writer.write(entry.getValue().toString());
+ }
+ }
+ writer.write("]");
+
+ writer.write(": ");
+
+ writer.write(AbstractCluster.formatVector(point.getVector(), dictionary));
+ if (iterator.hasNext()) {
+ writer.write("\n\t");
+ }
+ }
+ writer.write('\n');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
new file mode 100644
index 0000000..70f8f6f
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+
+/**
+ * Writes out clusters
+ */
+public interface ClusterWriter extends Closeable {
+
+ /**
+ * Write all values in the Iterable to the output
+ *
+ * @param iterable The {@link Iterable} to loop over
+ * @return the number of docs written
+ * @throws java.io.IOException if there was a problem writing
+ */
+ long write(Iterable<ClusterWritable> iterable) throws IOException;
+
+ /**
+ * Write out a Cluster
+ */
+ void write(ClusterWritable clusterWritable) throws IOException;
+
+ /**
+ * Write the first {@code maxDocs} to the output.
+ *
+ * @param iterable The {@link Iterable} to loop over
+ * @param maxDocs the maximum number of docs to write
+ * @return The number of docs written
+ * @throws IOException if there was a problem writing
+ */
+ long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
new file mode 100644
index 0000000..25e8f3b
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.clustering;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.StringUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+
+/**
+ * GraphML -- see http://gephi.org/users/supported-graph-formats/graphml-format/
+ */
+public class GraphMLClusterWriter extends AbstractClusterWriter {
+
+ private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
+ private final Map<Integer, Color> colors = new HashMap<>();
+ private Color lastClusterColor;
+ private float lastX;
+ private float lastY;
+ private Random random;
+ private int posStep;
+ private final String[] dictionary;
+ private final int numTopFeatures;
+ private final int subString;
+
+ public GraphMLClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
+ DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString)
+ throws IOException {
+ super(writer, clusterIdToPoints, measure);
+ this.dictionary = dictionary;
+ this.numTopFeatures = numTopFeatures;
+ this.subString = subString;
+ init(writer);
+ }
+
+ private void init(Writer writer) throws IOException {
+ writer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+ writer.append("<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n"
+ + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + "xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n"
+ + "http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">");
+ //support rgb
+ writer.append("<key attr.name=\"r\" attr.type=\"int\" for=\"node\" id=\"r\"/>\n"
+ + "<key attr.name=\"g\" attr.type=\"int\" for=\"node\" id=\"g\"/>\n"
+ + "<key attr.name=\"b\" attr.type=\"int\" for=\"node\" id=\"b\"/>"
+ + "<key attr.name=\"size\" attr.type=\"int\" for=\"node\" id=\"size\"/>"
+ + "<key attr.name=\"weight\" attr.type=\"float\" for=\"edge\" id=\"weight\"/>"
+ + "<key attr.name=\"x\" attr.type=\"float\" for=\"node\" id=\"x\"/>"
+ + "<key attr.name=\"y\" attr.type=\"float\" for=\"node\" id=\"y\"/>");
+ writer.append("<graph edgedefault=\"undirected\">");
+ lastClusterColor = new Color();
+ posStep = (int) (0.1 * clusterIdToPoints.size()) + 100;
+ random = RandomUtils.getRandom();
+ }
+
+ /*
+ <?xml version="1.0" encoding="UTF-8"?>
+ <graphml xmlns="http://graphml.graphdrawing.org/xmlns"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns
+ http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
+ <graph id="G" edgedefault="undirected">
+ <node id="n0"/>
+ <node id="n1"/>
+ <edge id="e1" source="n0" target="n1"/>
+ </graph>
+ </graphml>
+ */
+
+ @Override
+ public void write(ClusterWritable clusterWritable) throws IOException {
+ StringBuilder line = new StringBuilder();
+ Cluster cluster = clusterWritable.getValue();
+ Color rgb = getColor(cluster.getId());
+
+ String topTerms = "";
+ if (dictionary != null) {
+ topTerms = getTopTerms(cluster.getCenter(), dictionary, numTopFeatures);
+ }
+ String clusterLabel = String.valueOf(cluster.getId()) + '_' + topTerms;
+ //do some positioning so that items are visible and grouped together
+ //TODO: put in a real layout algorithm
+ float x = lastX + 1000;
+ float y = lastY;
+ if (x > (1000 + posStep)) {
+ y = lastY + 1000;
+ x = 0;
+ }
+
+ line.append(createNode(clusterLabel, rgb, x, y));
+ List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(cluster.getId());
+ if (points != null) {
+ for (WeightedVectorWritable point : points) {
+ Vector theVec = point.getVector();
+ double distance = 1;
+ if (measure != null) {
+ //scale the distance
+ distance = measure.distance(cluster.getCenter().getLengthSquared(), cluster.getCenter(), theVec) * 500;
+ }
+ String vecStr;
+ int angle = random.nextInt(360); //pick an angle at random and then scale along that angle
+ double angleRads = Math.toRadians(angle);
+
+ float targetX = x + (float) (distance * Math.cos(angleRads));
+ float targetY = y + (float) (distance * Math.sin(angleRads));
+ if (theVec instanceof NamedVector) {
+ vecStr = ((NamedVector) theVec).getName();
+ } else {
+ vecStr = theVec.asFormatString();
+ //do some basic manipulations for display
+ vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
+ }
+ if (subString > 0 && vecStr.length() > subString) {
+ vecStr = vecStr.substring(0, subString);
+ }
+ line.append(createNode(vecStr, rgb, targetX, targetY));
+ line.append(createEdge(clusterLabel, vecStr, distance));
+ }
+ }
+ lastClusterColor = rgb;
+ lastX = x;
+ lastY = y;
+ getWriter().append(line).append("\n");
+ }
+
+ private Color getColor(int clusterId) {
+ Color result = colors.get(clusterId);
+ if (result == null) {
+ result = new Color();
+ //there is probably some better way to color a graph
+ int incR = 0;
+ int incG = 0;
+ int incB = 0;
+ if (lastClusterColor.r + 20 < 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) {
+ incR = 20;
+ incG = 0;
+ incB = 0;
+ } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) {
+ incG = 20;
+ incB = 0;
+ } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 >= 256 && lastClusterColor.b + 20 < 256) {
+ incB = 20;
+ } else {
+ incR += 3;
+ incG += 3;
+ incR += 3;
+ }
+ result.r = (lastClusterColor.r + incR) % 256;
+ result.g = (lastClusterColor.g + incG) % 256;
+ result.b = (lastClusterColor.b + incB) % 256;
+ colors.put(clusterId, result);
+ }
+ return result;
+ }
+
+ private static String createEdge(String left, String right, double distance) {
+ left = StringUtils.escapeXML(left);
+ right = StringUtils.escapeXML(right);
+ return "<edge id=\"" + left + '_' + right + "\" source=\"" + left + "\" target=\"" + right + "\">"
+ + "<data key=\"weight\">" + distance + "</data></edge>";
+ }
+
+ private static String createNode(String s, Color rgb, float x, float y) {
+ return "<node id=\"" + StringUtils.escapeXML(s) + "\"><data key=\"r\">" + rgb.r
+ + "</data>"
+ + "<data key=\"g\">" + rgb.g
+ + "</data>"
+ + "<data key=\"b\">" + rgb.b
+ + "</data>"
+ + "<data key=\"x\">" + x
+ + "</data>"
+ + "<data key=\"y\">" + y
+ + "</data>"
+ + "</node>";
+ }
+
+ @Override
+ public void close() throws IOException {
+ getWriter().append("</graph>").append("</graphml>");
+ super.close();
+ }
+
+ private static class Color {
+ int r;
+ int g;
+ int b;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
new file mode 100644
index 0000000..d564a73
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.utils.clustering;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dump cluster info to JSON formatted lines. Heavily inspired by
+ * ClusterDumperWriter.java and CSVClusterWriter.java
+ *
+ */
+public class JsonClusterWriter extends AbstractClusterWriter {
+ private final String[] dictionary;
+ private final int numTopFeatures;
+ private final ObjectMapper jxn;
+
+ private static final Logger log = LoggerFactory.getLogger(JsonClusterWriter.class);
+ private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
+
+ public JsonClusterWriter(Writer writer,
+ Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
+ DistanceMeasure measure, int numTopFeatures, String[] dictionary) {
+ super(writer, clusterIdToPoints, measure);
+ this.numTopFeatures = numTopFeatures;
+ this.dictionary = dictionary;
+ jxn = new ObjectMapper();
+ }
+
+ /**
+ * Generate HashMap with cluster info and write as a single JSON formatted
+ * line
+ */
+ @Override
+ public void write(ClusterWritable clusterWritable) throws IOException {
+ Map<String, Object> res = new HashMap<>();
+
+ // get top terms
+ if (dictionary != null) {
+ List<Object> topTerms = getTopFeaturesList(clusterWritable.getValue()
+ .getCenter(), dictionary, numTopFeatures);
+ res.put("top_terms", topTerms);
+ } else {
+ res.put("top_terms", new ArrayList<>());
+ }
+
+ // get human-readable cluster representation
+ Cluster cluster = clusterWritable.getValue();
+ res.put("cluster_id", cluster.getId());
+
+ if (dictionary != null) {
+ Map<String,Object> fmtStr = cluster.asJson(dictionary);
+ res.put("cluster", fmtStr);
+
+ // get points
+ List<Object> points = getPoints(cluster, dictionary);
+ res.put("points", points);
+ } else {
+ res.put("cluster", new HashMap<>());
+ res.put("points", new ArrayList<>());
+ }
+
+ // write JSON
+ Writer writer = getWriter();
+ writer.write(jxn.writeValueAsString(res) + "\n");
+ }
+
+ /**
+ * Create a List of HashMaps containing top terms information
+ *
+ * @return List<Object>
+ */
+ public List<Object> getTopFeaturesList(Vector vector, String[] dictionary,
+ int numTerms) {
+
+ List<TermIndexWeight> vectorTerms = new ArrayList<>();
+
+ for (Vector.Element elt : vector.nonZeroes()) {
+ vectorTerms.add(new TermIndexWeight(elt.index(), elt.get()));
+ }
+
+ // Sort results in reverse order (i.e. weight in descending order)
+ Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() {
+ @Override
+ public int compare(TermIndexWeight one, TermIndexWeight two) {
+ return Double.compare(two.weight, one.weight);
+ }
+ });
+
+ List<Object> topTerms = new ArrayList<>();
+
+ for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) {
+ int index = vectorTerms.get(i).index;
+ String dictTerm = dictionary[index];
+ if (dictTerm == null) {
+ log.error("Dictionary entry missing for {}", index);
+ continue;
+ }
+ Map<String, Object> term_entry = new HashMap<>();
+ term_entry.put(dictTerm, vectorTerms.get(i).weight);
+ topTerms.add(term_entry);
+ }
+
+ return topTerms;
+ }
+
+ /**
+ * Create a List of HashMaps containing Vector point information
+ *
+ * @return List<Object>
+ */
+ public List<Object> getPoints(Cluster cluster, String[] dictionary) {
+ List<Object> vectorObjs = new ArrayList<>();
+ List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get(
+ cluster.getId());
+
+ if (points != null) {
+ for (WeightedPropertyVectorWritable point : points) {
+ Map<String, Object> entry = new HashMap<>();
+ Vector theVec = point.getVector();
+ if (theVec instanceof NamedVector) {
+ entry.put("vector_name", ((NamedVector) theVec).getName());
+ } else {
+ String vecStr = theVec.asFormatString();
+ // do some basic manipulations for display
+ vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
+ entry.put("vector_name", vecStr);
+ }
+ entry.put("weight", String.valueOf(point.getWeight()));
+ try {
+ entry.put("point",
+ AbstractCluster.formatVectorAsJson(point.getVector(), dictionary));
+ } catch (IOException e) {
+ log.error("IOException: ", e);
+ }
+ vectorObjs.add(entry);
+ }
+ }
+ return vectorObjs;
+ }
+
+ /**
+ * Convenience class for sorting terms
+ *
+ */
+ private static class TermIndexWeight {
+ private final int index;
+ private final double weight;
+
+ TermIndexWeight(int index, double weight) {
+ this.index = index;
+ this.weight = weight;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
new file mode 100644
index 0000000..54ad43f
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.email;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Configuration options to be used by {@link MailProcessor}. Includes options controlling the exact output format
+ * and which mail fields are included (body, to, from, subject, etc.)
+ */
+public class MailOptions {
+
+ public static final String FROM = "FROM";
+ public static final String TO = "TO";
+ public static final String REFS = "REFS";
+ public static final String SUBJECT = "SUBJECT";
+ public static final Pattern DEFAULT_QUOTED_TEXT = Pattern.compile("^(\\||>)");
+
+ private boolean stripQuotedText;
+ private File input;
+ private String outputDir;
+ private String prefix;
+ private int chunkSize;
+ private Charset charset;
+ private String separator;
+ private String bodySeparator = "\n";
+ private boolean includeBody;
+ private Pattern[] patternsToMatch;
+ //maps FROM, TO, REFS, SUBJECT, etc. to the order they appear in patternsToMatch. See MailToRecMapper
+ private Map<String, Integer> patternOrder;
+
+ //the regular expression to use for identifying quoted text.
+ private Pattern quotedTextPattern = DEFAULT_QUOTED_TEXT;
+
+ public File getInput() {
+ return input;
+ }
+
+ public void setInput(File input) {
+ this.input = input;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ /**
+ * Sets the output directory where sequence files will be written.
+ */
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ /**
+ * Sets the prefix that is combined with the archive name and with message ids to create {@code SequenceFile} keys.
+ * @param prefix The name of the directory containing the mail archive is commonly used.
+ */
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ /**
+ * Sets the size of each generated sequence file, in Megabytes.
+ */
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ public Charset getCharset() {
+ return charset;
+ }
+
+ /**
+ * Sets the encoding of the input
+ */
+ public void setCharset(Charset charset) {
+ this.charset = charset;
+ }
+
+ public String getSeparator() {
+ return separator;
+ }
+
+ /**
+ * Sets the separator to use in the output between metadata items (to, from, etc.).
+ */
+ public void setSeparator(String separator) {
+ this.separator = separator;
+ }
+
+ public String getBodySeparator() {
+ return bodySeparator;
+ }
+
+ /**
+ * Sets the separator to use in the output between lines in the body, the default is "\n".
+ */
+ public void setBodySeparator(String bodySeparator) {
+ this.bodySeparator = bodySeparator;
+ }
+
+ public boolean isIncludeBody() {
+ return includeBody;
+ }
+
+ /**
+ * Sets whether mail bodies are included in the output
+ */
+ public void setIncludeBody(boolean includeBody) {
+ this.includeBody = includeBody;
+ }
+
+ public Pattern[] getPatternsToMatch() {
+ return patternsToMatch;
+ }
+
+ /**
+ * Sets the list of patterns to be applied in the given order to extract metadata fields (to, from, subject, etc.)
+ * from the input
+ */
+ public void setPatternsToMatch(Pattern[] patternsToMatch) {
+ this.patternsToMatch = patternsToMatch;
+ }
+
+ public Map<String, Integer> getPatternOrder() {
+ return patternOrder;
+ }
+
+ public void setPatternOrder(Map<String, Integer> patternOrder) {
+ this.patternOrder = patternOrder;
+ }
+
+ /**
+ *
+ * @return true if we should strip out quoted email text
+ */
+ public boolean isStripQuotedText() {
+ return stripQuotedText;
+ }
+
+ /**
+ *
+ * Sets whether quoted text such as lines starting with | or > is striped off.
+ */
+ public void setStripQuotedText(boolean stripQuotedText) {
+ this.stripQuotedText = stripQuotedText;
+ }
+
+ public Pattern getQuotedTextPattern() {
+ return quotedTextPattern;
+ }
+
+ /**
+ * Sets the {@link java.util.regex.Pattern} to use to identify lines that are quoted text. Default is | and >
+ * @see #setStripQuotedText(boolean)
+ */
+ public void setQuotedTextPattern(Pattern quotedTextPattern) {
+ this.quotedTextPattern = quotedTextPattern;
+ }
+}