You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:51:57 UTC
[29/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java b/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
deleted file mode 100644
index 6178f80..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
+++ /dev/null
@@ -1,673 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
-import java.util.BitSet;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.io.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.CommandLineUtil;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
-import org.apache.mahout.math.jet.random.sampling.RandomSampler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A utility for splitting files in the input format used by the Bayes
- * classifiers or anything else that has one item per line or SequenceFiles (key/value)
- * into training and test sets in order to perform cross-validation.
- * <p/>
- * <p/>
- * This class can be used to split directories of files or individual files into
- * training and test sets using a number of different methods.
- * <p/>
- * When executed via {@link #splitDirectory(Path)} or {@link #splitFile(Path)},
- * the lines read from one or more, input files are written to files of the same
- * name into the directories specified by the
- * {@link #setTestOutputDirectory(Path)} and
- * {@link #setTrainingOutputDirectory(Path)} methods.
- * <p/>
- * The composition of the test set is determined using one of the following
- * approaches:
- * <ul>
- * <li>A contiguous set of items can be chosen from the input file(s) using the
- * {@link #setTestSplitSize(int)} or {@link #setTestSplitPct(int)} methods.
- * {@link #setTestSplitSize(int)} allocates a fixed number of items, while
- * {@link #setTestSplitPct(int)} allocates a percentage of the original input,
- * rounded up to the nearest integer. {@link #setSplitLocation(int)} is used to
- * control the position in the input from which the test data is extracted and
- * is described further below.</li>
- * <li>A random sampling of items can be chosen from the input files(s) using
- * the {@link #setTestRandomSelectionSize(int)} or
- * {@link #setTestRandomSelectionPct(int)} methods, each choosing a fixed test
- * set size or percentage of the input set size as described above. The
- * {@link RandomSampler} class from {@code mahout-math} is used to create a sample
- * of the appropriate size.</li>
- * </ul>
- * <p/>
- * Any one of the methods above can be used to control the size of the test set.
- * If multiple methods are called, a runtime exception will be thrown at
- * execution time.
- * <p/>
- * The {@link #setSplitLocation(int)} method is passed an integer from 0 to 100
- * (inclusive) which is translated into the position of the start of the test
- * data within the input file.
- * <p/>
- * Given:
- * <ul>
- * <li>an input file of 1500 lines</li>
- * <li>a desired test data size of 10 percent</li>
- * </ul>
- * <p/>
- * <ul>
- * <li>A split location of 0 will cause the first 150 items appearing in the
- * input set to be written to the test set.</li>
- * <li>A split location of 25 will cause items 375-525 to be written to the test
- * set.</li>
- * <li>A split location of 100 will cause the last 150 items in the input to be
- * written to the test set</li>
- * </ul>
- * The start of the split will always be adjusted forwards in order to ensure
- * that the desired test set size is allocated. Split location has no effect is
- * random sampling is employed.
- */
-public class SplitInput extends AbstractJob {
-
- private static final Logger log = LoggerFactory.getLogger(SplitInput.class);
-
- private int testSplitSize = -1;
- private int testSplitPct = -1;
- private int splitLocation = 100;
- private int testRandomSelectionSize = -1;
- private int testRandomSelectionPct = -1;
- private int keepPct = 100;
- private Charset charset = Charsets.UTF_8;
- private boolean useSequence;
- private boolean useMapRed;
-
- private Path inputDirectory;
- private Path trainingOutputDirectory;
- private Path testOutputDirectory;
- private Path mapRedOutputDirectory;
-
- private SplitCallback callback;
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (parseArgs(args)) {
- splitDirectory();
- }
- return 0;
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new SplitInput(), args);
- }
-
- /**
- * Configure this instance based on the command-line arguments contained within provided array.
- * Calls {@link #validate()} to ensure consistency of configuration.
- *
- * @return true if the arguments were parsed successfully and execution should proceed.
- * @throws Exception if there is a problem parsing the command-line arguments or the particular
- * combination would violate class invariants.
- */
- private boolean parseArgs(String[] args) throws Exception {
-
- addInputOption();
- addOption("trainingOutput", "tr", "The training data output directory", false);
- addOption("testOutput", "te", "The test data output directory", false);
- addOption("testSplitSize", "ss", "The number of documents held back as test data for each category", false);
- addOption("testSplitPct", "sp", "The % of documents held back as test data for each category", false);
- addOption("splitLocation", "sl", "Location for start of test data expressed as a percentage of the input file "
- + "size (0=start, 50=middle, 100=end", false);
- addOption("randomSelectionSize", "rs", "The number of items to be randomly selected as test data ", false);
- addOption("randomSelectionPct", "rp", "Percentage of items to be randomly selected as test data when using "
- + "mapreduce mode", false);
- addOption("charset", "c", "The name of the character encoding of the input files (not needed if using "
- + "SequenceFiles)", false);
- addOption(buildOption("sequenceFiles", "seq", "Set if the input files are sequence files. Default is false",
- false, false, "false"));
- addOption(DefaultOptionCreator.methodOption().create());
- addOption(DefaultOptionCreator.overwriteOption().create());
- //TODO: extend this to sequential mode
- addOption("keepPct", "k", "The percentage of total data to keep in map-reduce mode, the rest will be ignored. "
- + "Default is 100%", false);
- addOption("mapRedOutputDir", "mro", "Output directory for map reduce jobs", false);
-
- if (parseArguments(args) == null) {
- return false;
- }
-
- try {
- inputDirectory = getInputPath();
-
- useMapRed = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.MAPREDUCE_METHOD);
-
- if (useMapRed) {
- if (!hasOption("randomSelectionPct")) {
- throw new OptionException(getCLIOption("randomSelectionPct"),
- "must set randomSelectionPct when mapRed option is used");
- }
- if (!hasOption("mapRedOutputDir")) {
- throw new OptionException(getCLIOption("mapRedOutputDir"),
- "mapRedOutputDir must be set when mapRed option is used");
- }
- mapRedOutputDirectory = new Path(getOption("mapRedOutputDir"));
- if (hasOption("keepPct")) {
- keepPct = Integer.parseInt(getOption("keepPct"));
- }
- if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- HadoopUtil.delete(getConf(), mapRedOutputDirectory);
- }
- } else {
- if (!hasOption("trainingOutput")
- || !hasOption("testOutput")) {
- throw new OptionException(getCLIOption("trainingOutput"),
- "trainingOutput and testOutput must be set if mapRed option is not used");
- }
- if (!hasOption("testSplitSize")
- && !hasOption("testSplitPct")
- && !hasOption("randomSelectionPct")
- && !hasOption("randomSelectionSize")) {
- throw new OptionException(getCLIOption("testSplitSize"),
- "must set one of test split size/percentage or randomSelectionSize/percentage");
- }
-
- trainingOutputDirectory = new Path(getOption("trainingOutput"));
- testOutputDirectory = new Path(getOption("testOutput"));
- FileSystem fs = trainingOutputDirectory.getFileSystem(getConf());
- if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- HadoopUtil.delete(fs.getConf(), trainingOutputDirectory);
- HadoopUtil.delete(fs.getConf(), testOutputDirectory);
- }
- fs.mkdirs(trainingOutputDirectory);
- fs.mkdirs(testOutputDirectory);
- }
-
- if (hasOption("charset")) {
- charset = Charset.forName(getOption("charset"));
- }
-
- if (hasOption("testSplitSize") && hasOption("testSplitPct")) {
- throw new OptionException(getCLIOption("testSplitPct"), "must have either split size or split percentage "
- + "option, not BOTH");
- }
-
- if (hasOption("testSplitSize")) {
- setTestSplitSize(Integer.parseInt(getOption("testSplitSize")));
- }
-
- if (hasOption("testSplitPct")) {
- setTestSplitPct(Integer.parseInt(getOption("testSplitPct")));
- }
-
- if (hasOption("splitLocation")) {
- setSplitLocation(Integer.parseInt(getOption("splitLocation")));
- }
-
- if (hasOption("randomSelectionSize")) {
- setTestRandomSelectionSize(Integer.parseInt(getOption("randomSelectionSize")));
- }
-
- if (hasOption("randomSelectionPct")) {
- setTestRandomSelectionPct(Integer.parseInt(getOption("randomSelectionPct")));
- }
-
- useSequence = hasOption("sequenceFiles");
-
- } catch (OptionException e) {
- log.error("Command-line option Exception", e);
- CommandLineUtil.printHelp(getGroup());
- return false;
- }
-
- validate();
- return true;
- }
-
- /**
- * Perform a split on directory specified by {@link #setInputDirectory(Path)} by calling {@link #splitFile(Path)}
- * on each file found within that directory.
- */
- public void splitDirectory() throws IOException, ClassNotFoundException, InterruptedException {
- this.splitDirectory(inputDirectory);
- }
-
- /**
- * Perform a split on the specified directory by calling {@link #splitFile(Path)} on each file found within that
- * directory.
- */
- public void splitDirectory(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = getConf();
- splitDirectory(conf, inputDir);
- }
-
- /*
- * See also splitDirectory(Path inputDir)
- * */
- public void splitDirectory(Configuration conf, Path inputDir)
- throws IOException, ClassNotFoundException, InterruptedException {
- FileSystem fs = inputDir.getFileSystem(conf);
- if (fs.getFileStatus(inputDir) == null) {
- throw new IOException(inputDir + " does not exist");
- }
- if (!fs.getFileStatus(inputDir).isDir()) {
- throw new IOException(inputDir + " is not a directory");
- }
-
- if (useMapRed) {
- SplitInputJob.run(conf, inputDir, mapRedOutputDirectory,
- keepPct, testRandomSelectionPct);
- } else {
- // input dir contains one file per category.
- FileStatus[] fileStats = fs.listStatus(inputDir, PathFilters.logsCRCFilter());
- for (FileStatus inputFile : fileStats) {
- if (!inputFile.isDir()) {
- splitFile(inputFile.getPath());
- }
- }
- }
- }
-
- /**
- * Perform a split on the specified input file. Results will be written to files of the same name in the specified
- * training and test output directories. The {@link #validate()} method is called prior to executing the split.
- */
- public void splitFile(Path inputFile) throws IOException {
- Configuration conf = getConf();
- FileSystem fs = inputFile.getFileSystem(conf);
- if (fs.getFileStatus(inputFile) == null) {
- throw new IOException(inputFile + " does not exist");
- }
- if (fs.getFileStatus(inputFile).isDir()) {
- throw new IOException(inputFile + " is a directory");
- }
-
- validate();
-
- Path testOutputFile = new Path(testOutputDirectory, inputFile.getName());
- Path trainingOutputFile = new Path(trainingOutputDirectory, inputFile.getName());
-
- int lineCount = countLines(fs, inputFile, charset);
-
- log.info("{} has {} lines", inputFile.getName(), lineCount);
-
- int testSplitStart = 0;
- int testSplitSize = this.testSplitSize; // don't modify state
- BitSet randomSel = null;
-
- if (testRandomSelectionPct > 0 || testRandomSelectionSize > 0) {
- testSplitSize = this.testRandomSelectionSize;
-
- if (testRandomSelectionPct > 0) {
- testSplitSize = Math.round(lineCount * testRandomSelectionPct / 100.0f);
- }
- log.info("{} test split size is {} based on random selection percentage {}",
- inputFile.getName(), testSplitSize, testRandomSelectionPct);
- long[] ridx = new long[testSplitSize];
- RandomSampler.sample(testSplitSize, lineCount - 1, testSplitSize, 0, ridx, 0, RandomUtils.getRandom());
- randomSel = new BitSet(lineCount);
- for (long idx : ridx) {
- randomSel.set((int) idx + 1);
- }
- } else {
- if (testSplitPct > 0) { // calculate split size based on percentage
- testSplitSize = Math.round(lineCount * testSplitPct / 100.0f);
- log.info("{} test split size is {} based on percentage {}",
- inputFile.getName(), testSplitSize, testSplitPct);
- } else {
- log.info("{} test split size is {}", inputFile.getName(), testSplitSize);
- }
-
- if (splitLocation > 0) { // calculate start of split based on percentage
- testSplitStart = Math.round(lineCount * splitLocation / 100.0f);
- if (lineCount - testSplitStart < testSplitSize) {
- // adjust split start downwards based on split size.
- testSplitStart = lineCount - testSplitSize;
- }
- log.info("{} test split start is {} based on split location {}",
- inputFile.getName(), testSplitStart, splitLocation);
- }
-
- if (testSplitStart < 0) {
- throw new IllegalArgumentException("test split size for " + inputFile + " is too large, it would produce an "
- + "empty training set from the initial set of " + lineCount + " examples");
- } else if (lineCount - testSplitSize < testSplitSize) {
- log.warn("Test set size for {} may be too large, {} is larger than the number of "
- + "lines remaining in the training set: {}",
- inputFile, testSplitSize, lineCount - testSplitSize);
- }
- }
- int trainCount = 0;
- int testCount = 0;
- if (!useSequence) {
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset));
- Writer trainingWriter = new OutputStreamWriter(fs.create(trainingOutputFile), charset);
- Writer testWriter = new OutputStreamWriter(fs.create(testOutputFile), charset)){
-
- String line;
- int pos = 0;
- while ((line = reader.readLine()) != null) {
- pos++;
-
- Writer writer;
- if (testRandomSelectionPct > 0) { // Randomly choose
- writer = randomSel.get(pos) ? testWriter : trainingWriter;
- } else { // Choose based on location
- writer = pos > testSplitStart ? testWriter : trainingWriter;
- }
-
- if (writer == testWriter) {
- if (testCount >= testSplitSize) {
- writer = trainingWriter;
- } else {
- testCount++;
- }
- }
- if (writer == trainingWriter) {
- trainCount++;
- }
- writer.write(line);
- writer.write('\n');
- }
-
- }
- } else {
- try (SequenceFileIterator<Writable, Writable> iterator =
- new SequenceFileIterator<>(inputFile, false, fs.getConf());
- SequenceFile.Writer trainingWriter = SequenceFile.createWriter(fs, fs.getConf(), trainingOutputFile,
- iterator.getKeyClass(), iterator.getValueClass());
- SequenceFile.Writer testWriter = SequenceFile.createWriter(fs, fs.getConf(), testOutputFile,
- iterator.getKeyClass(), iterator.getValueClass())) {
-
- int pos = 0;
- while (iterator.hasNext()) {
- pos++;
- SequenceFile.Writer writer;
- if (testRandomSelectionPct > 0) { // Randomly choose
- writer = randomSel.get(pos) ? testWriter : trainingWriter;
- } else { // Choose based on location
- writer = pos > testSplitStart ? testWriter : trainingWriter;
- }
-
- if (writer == testWriter) {
- if (testCount >= testSplitSize) {
- writer = trainingWriter;
- } else {
- testCount++;
- }
- }
- if (writer == trainingWriter) {
- trainCount++;
- }
- Pair<Writable, Writable> pair = iterator.next();
- writer.append(pair.getFirst(), pair.getSecond());
- }
-
- }
- }
- log.info("file: {}, input: {} train: {}, test: {} starting at {}",
- inputFile.getName(), lineCount, trainCount, testCount, testSplitStart);
-
- // testing;
- if (callback != null) {
- callback.splitComplete(inputFile, lineCount, trainCount, testCount, testSplitStart);
- }
- }
-
- public int getTestSplitSize() {
- return testSplitSize;
- }
-
- public void setTestSplitSize(int testSplitSize) {
- this.testSplitSize = testSplitSize;
- }
-
- public int getTestSplitPct() {
- return testSplitPct;
- }
-
- /**
- * Sets the percentage of the input data to allocate to the test split
- *
- * @param testSplitPct a value between 0 and 100 inclusive.
- */
- public void setTestSplitPct(int testSplitPct) {
- this.testSplitPct = testSplitPct;
- }
-
- /**
- * Sets the percentage of the input data to keep in a map reduce split input job
- *
- * @param keepPct a value between 0 and 100 inclusive.
- */
- public void setKeepPct(int keepPct) {
- this.keepPct = keepPct;
- }
-
- /**
- * Set to true to use map reduce to split the input
- *
- * @param useMapRed a boolean to indicate whether map reduce should be used
- */
- public void setUseMapRed(boolean useMapRed) {
- this.useMapRed = useMapRed;
- }
-
- public void setMapRedOutputDirectory(Path mapRedOutputDirectory) {
- this.mapRedOutputDirectory = mapRedOutputDirectory;
- }
-
- public int getSplitLocation() {
- return splitLocation;
- }
-
- /**
- * Set the location of the start of the test/training data split. Expressed as percentage of lines, for example
- * 0 indicates that the test data should be taken from the start of the file, 100 indicates that the test data
- * should be taken from the end of the input file, while 25 indicates that the test data should be taken from the
- * first quarter of the file.
- * <p/>
- * This option is only relevant in cases where random selection is not employed
- *
- * @param splitLocation a value between 0 and 100 inclusive.
- */
- public void setSplitLocation(int splitLocation) {
- this.splitLocation = splitLocation;
- }
-
- public Charset getCharset() {
- return charset;
- }
-
- /**
- * Set the charset used to read and write files
- */
- public void setCharset(Charset charset) {
- this.charset = charset;
- }
-
- public Path getInputDirectory() {
- return inputDirectory;
- }
-
- /**
- * Set the directory from which input data will be read when the the {@link #splitDirectory()} method is invoked
- */
- public void setInputDirectory(Path inputDir) {
- this.inputDirectory = inputDir;
- }
-
- public Path getTrainingOutputDirectory() {
- return trainingOutputDirectory;
- }
-
- /**
- * Set the directory to which training data will be written.
- */
- public void setTrainingOutputDirectory(Path trainingOutputDir) {
- this.trainingOutputDirectory = trainingOutputDir;
- }
-
- public Path getTestOutputDirectory() {
- return testOutputDirectory;
- }
-
- /**
- * Set the directory to which test data will be written.
- */
- public void setTestOutputDirectory(Path testOutputDir) {
- this.testOutputDirectory = testOutputDir;
- }
-
- public SplitCallback getCallback() {
- return callback;
- }
-
- /**
- * Sets the callback used to inform the caller that an input file has been successfully split
- */
- public void setCallback(SplitCallback callback) {
- this.callback = callback;
- }
-
- public int getTestRandomSelectionSize() {
- return testRandomSelectionSize;
- }
-
- /**
- * Sets number of random input samples that will be saved to the test set.
- */
- public void setTestRandomSelectionSize(int testRandomSelectionSize) {
- this.testRandomSelectionSize = testRandomSelectionSize;
- }
-
- public int getTestRandomSelectionPct() {
-
- return testRandomSelectionPct;
- }
-
- /**
- * Sets number of random input samples that will be saved to the test set as a percentage of the size of the
- * input set.
- *
- * @param randomSelectionPct a value between 0 and 100 inclusive.
- */
- public void setTestRandomSelectionPct(int randomSelectionPct) {
- this.testRandomSelectionPct = randomSelectionPct;
- }
-
- /**
- * Validates that the current instance is in a consistent state
- *
- * @throws IllegalArgumentException if settings violate class invariants.
- * @throws IOException if output directories do not exist or are not directories.
- */
- public void validate() throws IOException {
- Preconditions.checkArgument(testSplitSize >= 1 || testSplitSize == -1,
- "Invalid testSplitSize: " + testSplitSize + ". Must be: testSplitSize >= 1 or testSplitSize = -1");
- Preconditions.checkArgument(splitLocation >= 0 && splitLocation <= 100 || splitLocation == -1,
- "Invalid splitLocation percentage: " + splitLocation + ". Must be: 0 <= splitLocation <= 100 or splitLocation = -1");
- Preconditions.checkArgument(testSplitPct >= 0 && testSplitPct <= 100 || testSplitPct == -1,
- "Invalid testSplitPct percentage: " + testSplitPct + ". Must be: 0 <= testSplitPct <= 100 or testSplitPct = -1");
- Preconditions.checkArgument(testRandomSelectionPct >= 0 && testRandomSelectionPct <= 100
- || testRandomSelectionPct == -1,"Invalid testRandomSelectionPct percentage: " + testRandomSelectionPct +
- ". Must be: 0 <= testRandomSelectionPct <= 100 or testRandomSelectionPct = -1");
-
- Preconditions.checkArgument(trainingOutputDirectory != null || useMapRed,
- "No training output directory was specified");
- Preconditions.checkArgument(testOutputDirectory != null || useMapRed, "No test output directory was specified");
-
- // only one of the following may be set, one must be set.
- int count = 0;
- if (testSplitSize > 0) {
- count++;
- }
- if (testSplitPct > 0) {
- count++;
- }
- if (testRandomSelectionSize > 0) {
- count++;
- }
- if (testRandomSelectionPct > 0) {
- count++;
- }
-
- Preconditions.checkArgument(count == 1, "Exactly one of testSplitSize, testSplitPct, testRandomSelectionSize, "
- + "testRandomSelectionPct should be set");
-
- if (!useMapRed) {
- Configuration conf = getConf();
- FileSystem fs = trainingOutputDirectory.getFileSystem(conf);
- FileStatus trainingOutputDirStatus = fs.getFileStatus(trainingOutputDirectory);
- Preconditions.checkArgument(trainingOutputDirStatus != null && trainingOutputDirStatus.isDir(),
- "%s is not a directory", trainingOutputDirectory);
- FileStatus testOutputDirStatus = fs.getFileStatus(testOutputDirectory);
- Preconditions.checkArgument(testOutputDirStatus != null && testOutputDirStatus.isDir(),
- "%s is not a directory", testOutputDirectory);
- }
- }
-
- /**
- * Count the lines in the file specified as returned by {@code BufferedReader.readLine()}
- *
- * @param inputFile the file whose lines will be counted
- * @param charset the charset of the file to read
- * @return the number of lines in the input file.
- * @throws IOException if there is a problem opening or reading the file.
- */
- public static int countLines(FileSystem fs, Path inputFile, Charset charset) throws IOException {
- int lineCount = 0;
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset))){
- while (reader.readLine() != null) {
- lineCount++;
- }
- }
- return lineCount;
- }
-
- /**
- * Used to pass information back to a caller once a file has been split without the need for a data object
- */
- public interface SplitCallback {
- void splitComplete(Path inputFile, int lineCount, int trainCount, int testCount, int testSplitStart);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java b/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
deleted file mode 100644
index 4a1ff86..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
-
-/**
- * Class which implements a map reduce version of SplitInput.
- * This class takes a SequenceFile input, e.g. a set of training data
- * for a learning algorithm, downsamples it, applies a random
- * permutation and splits it into test and training sets
- */
-public final class SplitInputJob {
-
- private static final String DOWNSAMPLING_FACTOR = "SplitInputJob.downsamplingFactor";
- private static final String RANDOM_SELECTION_PCT = "SplitInputJob.randomSelectionPct";
- private static final String TRAINING_TAG = "training";
- private static final String TEST_TAG = "test";
-
- private SplitInputJob() {}
-
- /**
- * Run job to downsample, randomly permute and split data into test and
- * training sets. This job takes a SequenceFile as input and outputs two
- * SequenceFiles test-r-00000 and training-r-00000 which contain the test and
- * training sets respectively
- *
- * @param initialConf
- * Initial configuration
- * @param inputPath
- * path to input data SequenceFile
- * @param outputPath
- * path for output data SequenceFiles
- * @param keepPct
- * percentage of key value pairs in input to keep. The rest are
- * discarded
- * @param randomSelectionPercent
- * percentage of key value pairs to allocate to test set. Remainder
- * are allocated to training set
- */
- @SuppressWarnings("rawtypes")
- public static void run(Configuration initialConf, Path inputPath,
- Path outputPath, int keepPct, float randomSelectionPercent)
- throws IOException, ClassNotFoundException, InterruptedException {
-
- int downsamplingFactor = (int) (100.0 / keepPct);
- initialConf.setInt(DOWNSAMPLING_FACTOR, downsamplingFactor);
- initialConf.setFloat(RANDOM_SELECTION_PCT, randomSelectionPercent);
-
- // Determine class of keys and values
- FileSystem fs = FileSystem.get(initialConf);
-
- SequenceFileDirIterator<? extends WritableComparable, Writable> iterator =
- new SequenceFileDirIterator<>(inputPath,
- PathType.LIST, PathFilters.partFilter(), null, false, fs.getConf());
- Class<? extends WritableComparable> keyClass;
- Class<? extends Writable> valueClass;
- if (iterator.hasNext()) {
- Pair<? extends WritableComparable, Writable> pair = iterator.next();
- keyClass = pair.getFirst().getClass();
- valueClass = pair.getSecond().getClass();
- } else {
- throw new IllegalStateException("Couldn't determine class of the input values");
- }
-
- Job job = new Job(new Configuration(initialConf));
-
- MultipleOutputs.addNamedOutput(job, TRAINING_TAG, SequenceFileOutputFormat.class, keyClass, valueClass);
- MultipleOutputs.addNamedOutput(job, TEST_TAG, SequenceFileOutputFormat.class, keyClass, valueClass);
- job.setJarByClass(SplitInputJob.class);
- FileInputFormat.addInputPath(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- job.setNumReduceTasks(1);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setMapperClass(SplitInputMapper.class);
- job.setReducerClass(SplitInputReducer.class);
- job.setSortComparatorClass(SplitInputComparator.class);
- job.setOutputKeyClass(keyClass);
- job.setOutputValueClass(valueClass);
- job.submit();
- boolean succeeded = job.waitForCompletion(true);
- if (!succeeded) {
- throw new IllegalStateException("Job failed!");
- }
- }
-
- /** Mapper which downsamples the input by downsamplingFactor */
- public static class SplitInputMapper extends
- Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
-
- private int downsamplingFactor;
-
- @Override
- public void setup(Context ctx) {
- downsamplingFactor = ctx.getConfiguration().getInt(DOWNSAMPLING_FACTOR, 1);
- }
-
- /** Only run map() for one out of every downsampleFactor inputs */
- @Override
- public void run(Context context) throws IOException, InterruptedException {
- setup(context);
- int i = 0;
- while (context.nextKeyValue()) {
- if (i % downsamplingFactor == 0) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- i++;
- }
- cleanup(context);
- }
-
- }
-
- /** Reducer which uses MultipleOutputs to randomly allocate key value pairs between test and training outputs */
- public static class SplitInputReducer extends
- Reducer<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
-
- private MultipleOutputs multipleOutputs;
- private final Random rnd = RandomUtils.getRandom();
- private float randomSelectionPercent;
-
- @Override
- protected void setup(Context ctx) throws IOException {
- randomSelectionPercent = ctx.getConfiguration().getFloat(RANDOM_SELECTION_PCT, 0);
- multipleOutputs = new MultipleOutputs(ctx);
- }
-
- /**
- * Randomly allocate key value pairs between test and training sets.
- * randomSelectionPercent of the pairs will go to the test set.
- */
- @Override
- protected void reduce(WritableComparable<?> key, Iterable<Writable> values,
- Context context) throws IOException, InterruptedException {
- for (Writable value : values) {
- if (rnd.nextInt(100) < randomSelectionPercent) {
- multipleOutputs.write(TEST_TAG, key, value);
- } else {
- multipleOutputs.write(TRAINING_TAG, key, value);
- }
- }
-
- }
-
- @Override
- protected void cleanup(Context context) throws IOException {
- try {
- multipleOutputs.close();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- }
-
- /** Randomly permute key value pairs */
- public static class SplitInputComparator extends WritableComparator implements Serializable {
-
- private final Random rnd = RandomUtils.getRandom();
-
- protected SplitInputComparator() {
- super(WritableComparable.class);
- }
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- if (rnd.nextBoolean()) {
- return 1;
- } else {
- return -1;
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
deleted file mode 100644
index ac884d0..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import java.io.IOException;
-import java.io.Writer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.math.Vector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Base class for implementing ClusterWriter
- */
-public abstract class AbstractClusterWriter implements ClusterWriter {
-
- private static final Logger log = LoggerFactory.getLogger(AbstractClusterWriter.class);
-
- protected final Writer writer;
- protected final Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints;
- protected final DistanceMeasure measure;
-
- /**
- *
- * @param writer The underlying {@link java.io.Writer} to use
- * @param clusterIdToPoints The map between cluster ids {@link org.apache.mahout.clustering.Cluster#getId()} and the
- * points in the cluster
- * @param measure The {@link org.apache.mahout.common.distance.DistanceMeasure} used to calculate the distance.
- * Some writers may wish to use it for calculating weights for display. May be null.
- */
- protected AbstractClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
- DistanceMeasure measure) {
- this.writer = writer;
- this.clusterIdToPoints = clusterIdToPoints;
- this.measure = measure;
- }
-
- protected Writer getWriter() {
- return writer;
- }
-
- protected Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() {
- return clusterIdToPoints;
- }
-
- public static String getTopFeatures(Vector vector, String[] dictionary, int numTerms) {
-
- StringBuilder sb = new StringBuilder(100);
-
- for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) {
- String term = item.getFirst();
- sb.append("\n\t\t");
- sb.append(StringUtils.rightPad(term, 40));
- sb.append("=>");
- sb.append(StringUtils.leftPad(item.getSecond().toString(), 20));
- }
- return sb.toString();
- }
-
- public static String getTopTerms(Vector vector, String[] dictionary, int numTerms) {
-
- StringBuilder sb = new StringBuilder(100);
-
- for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) {
- String term = item.getFirst();
- sb.append(term).append('_');
- }
- sb.deleteCharAt(sb.length() - 1);
- return sb.toString();
- }
-
- @Override
- public long write(Iterable<ClusterWritable> iterable) throws IOException {
- return write(iterable, Long.MAX_VALUE);
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- }
-
- @Override
- public long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException {
- long result = 0;
- Iterator<ClusterWritable> iterator = iterable.iterator();
- while (result < maxDocs && iterator.hasNext()) {
- write(iterator.next());
- result++;
- }
- return result;
- }
-
- private static Collection<Pair<String, Double>> getTopPairs(Vector vector, String[] dictionary, int numTerms) {
- List<TermIndexWeight> vectorTerms = Lists.newArrayList();
-
- for (Vector.Element elt : vector.nonZeroes()) {
- vectorTerms.add(new TermIndexWeight(elt.index(), elt.get()));
- }
-
- // Sort results in reverse order (ie weight in descending order)
- Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() {
- @Override
- public int compare(TermIndexWeight one, TermIndexWeight two) {
- return Double.compare(two.weight, one.weight);
- }
- });
-
- Collection<Pair<String, Double>> topTerms = Lists.newLinkedList();
-
- for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) {
- int index = vectorTerms.get(i).index;
- String dictTerm = dictionary[index];
- if (dictTerm == null) {
- log.error("Dictionary entry missing for {}", index);
- continue;
- }
- topTerms.add(new Pair<>(dictTerm, vectorTerms.get(i).weight));
- }
-
- return topTerms;
- }
-
- private static class TermIndexWeight {
- private final int index;
- private final double weight;
-
- TermIndexWeight(int index, double weight) {
- this.index = index;
- this.weight = weight;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
deleted file mode 100644
index 7269016..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.math.NamedVector;
-import org.apache.mahout.math.Vector;
-
-import java.io.IOException;
-import java.io.Writer;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Format is adjacency style as put forth at http://gephi.org/users/supported-graph-formats/csv-format/, the centroid
- * is the first element and all the rest of the row are the points in that cluster
- *
- **/
-public class CSVClusterWriter extends AbstractClusterWriter {
-
- private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
-
- public CSVClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
- DistanceMeasure measure) {
- super(writer, clusterIdToPoints, measure);
- }
-
- @Override
- public void write(ClusterWritable clusterWritable) throws IOException {
- StringBuilder line = new StringBuilder();
- Cluster cluster = clusterWritable.getValue();
- line.append(cluster.getId());
- List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get(cluster.getId());
- if (points != null) {
- for (WeightedPropertyVectorWritable point : points) {
- Vector theVec = point.getVector();
- line.append(',');
- if (theVec instanceof NamedVector) {
- line.append(((NamedVector)theVec).getName());
- } else {
- String vecStr = theVec.asFormatString();
- //do some basic manipulations for display
- vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
- line.append(vecStr);
- }
- }
- getWriter().append(line).append("\n");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
deleted file mode 100644
index 75b5ded..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-import org.apache.commons.io.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.mahout.clustering.cdbw.CDbwEvaluator;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.evaluation.ClusterEvaluator;
-import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.ClassUtils;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import org.apache.mahout.utils.vectors.VectorHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class ClusterDumper extends AbstractJob {
-
- public static final String SAMPLE_POINTS = "samplePoints";
- DistanceMeasure measure;
-
- public enum OUTPUT_FORMAT {
- TEXT,
- CSV,
- GRAPH_ML,
- JSON,
- }
-
- public static final String DICTIONARY_TYPE_OPTION = "dictionaryType";
- public static final String DICTIONARY_OPTION = "dictionary";
- public static final String POINTS_DIR_OPTION = "pointsDir";
- public static final String NUM_WORDS_OPTION = "numWords";
- public static final String SUBSTRING_OPTION = "substring";
- public static final String EVALUATE_CLUSTERS = "evaluate";
-
- public static final String OUTPUT_FORMAT_OPT = "outputFormat";
-
- private static final Logger log = LoggerFactory.getLogger(ClusterDumper.class);
- private Path seqFileDir;
- private Path pointsDir;
- private long maxPointsPerCluster = Long.MAX_VALUE;
- private String termDictionary;
- private String dictionaryFormat;
- private int subString = Integer.MAX_VALUE;
- private int numTopFeatures = 10;
- private Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints;
- private OUTPUT_FORMAT outputFormat = OUTPUT_FORMAT.TEXT;
- private boolean runEvaluation;
-
- public ClusterDumper(Path seqFileDir, Path pointsDir) {
- this.seqFileDir = seqFileDir;
- this.pointsDir = pointsDir;
- init();
- }
-
- public ClusterDumper() {
- setConf(new Configuration());
- }
-
- public static void main(String[] args) throws Exception {
- new ClusterDumper().run(args);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- addInputOption();
- addOutputOption();
- addOption(OUTPUT_FORMAT_OPT, "of", "The optional output format for the results. Options: TEXT, CSV, JSON or GRAPH_ML",
- "TEXT");
- addOption(SUBSTRING_OPTION, "b", "The number of chars of the asFormatString() to print");
- addOption(NUM_WORDS_OPTION, "n", "The number of top terms to print");
- addOption(POINTS_DIR_OPTION, "p",
- "The directory containing points sequence files mapping input vectors to their cluster. "
- + "If specified, then the program will output the points associated with a cluster");
- addOption(SAMPLE_POINTS, "sp", "Specifies the maximum number of points to include _per_ cluster. The default "
- + "is to include all points");
- addOption(DICTIONARY_OPTION, "d", "The dictionary file");
- addOption(DICTIONARY_TYPE_OPTION, "dt", "The dictionary file type (text|sequencefile)", "text");
- addOption(buildOption(EVALUATE_CLUSTERS, "e", "Run ClusterEvaluator and CDbwEvaluator over the input. "
- + "The output will be appended to the rest of the output at the end.", false, false, null));
- addOption(DefaultOptionCreator.distanceMeasureOption().create());
-
- // output is optional, will print to System.out per default
- if (parseArguments(args, false, true) == null) {
- return -1;
- }
-
- seqFileDir = getInputPath();
- if (hasOption(POINTS_DIR_OPTION)) {
- pointsDir = new Path(getOption(POINTS_DIR_OPTION));
- }
- outputFile = getOutputFile();
- if (hasOption(SUBSTRING_OPTION)) {
- int sub = Integer.parseInt(getOption(SUBSTRING_OPTION));
- if (sub >= 0) {
- subString = sub;
- }
- }
- termDictionary = getOption(DICTIONARY_OPTION);
- dictionaryFormat = getOption(DICTIONARY_TYPE_OPTION);
- if (hasOption(NUM_WORDS_OPTION)) {
- numTopFeatures = Integer.parseInt(getOption(NUM_WORDS_OPTION));
- }
- if (hasOption(OUTPUT_FORMAT_OPT)) {
- outputFormat = OUTPUT_FORMAT.valueOf(getOption(OUTPUT_FORMAT_OPT));
- }
- if (hasOption(SAMPLE_POINTS)) {
- maxPointsPerCluster = Long.parseLong(getOption(SAMPLE_POINTS));
- } else {
- maxPointsPerCluster = Long.MAX_VALUE;
- }
- runEvaluation = hasOption(EVALUATE_CLUSTERS);
- String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
- measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class);
-
- init();
- printClusters(null);
- return 0;
- }
-
- public void printClusters(String[] dictionary) throws Exception {
- Configuration conf = new Configuration();
-
- if (this.termDictionary != null) {
- if ("text".equals(dictionaryFormat)) {
- dictionary = VectorHelper.loadTermDictionary(new File(this.termDictionary));
- } else if ("sequencefile".equals(dictionaryFormat)) {
- dictionary = VectorHelper.loadTermDictionary(conf, this.termDictionary);
- } else {
- throw new IllegalArgumentException("Invalid dictionary format");
- }
- }
-
- Writer writer;
- boolean shouldClose;
- if (this.outputFile == null) {
- shouldClose = false;
- writer = new OutputStreamWriter(System.out, Charsets.UTF_8);
- } else {
- shouldClose = true;
- if (outputFile.getName().startsWith("s3n://")) {
- Path p = outputPath;
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- writer = new OutputStreamWriter(fs.create(p), Charsets.UTF_8);
- } else {
- Files.createParentDirs(outputFile);
- writer = Files.newWriter(this.outputFile, Charsets.UTF_8);
- }
- }
- ClusterWriter clusterWriter = createClusterWriter(writer, dictionary);
- try {
- long numWritten = clusterWriter.write(new SequenceFileDirValueIterable<ClusterWritable>(new Path(seqFileDir,
- "part-*"), PathType.GLOB, conf));
-
- writer.flush();
- if (runEvaluation) {
- HadoopUtil.delete(conf, new Path("tmp/representative"));
- int numIters = 5;
- RepresentativePointsDriver.main(new String[]{
- "--input", seqFileDir.toString(),
- "--output", "tmp/representative",
- "--clusteredPoints", pointsDir.toString(),
- "--distanceMeasure", measure.getClass().getName(),
- "--maxIter", String.valueOf(numIters)
- });
- conf.set(RepresentativePointsDriver.DISTANCE_MEASURE_KEY, measure.getClass().getName());
- conf.set(RepresentativePointsDriver.STATE_IN_KEY, "tmp/representative/representativePoints-" + numIters);
- ClusterEvaluator ce = new ClusterEvaluator(conf, seqFileDir);
- writer.append("\n");
- writer.append("Inter-Cluster Density: ").append(String.valueOf(ce.interClusterDensity())).append("\n");
- writer.append("Intra-Cluster Density: ").append(String.valueOf(ce.intraClusterDensity())).append("\n");
- CDbwEvaluator cdbw = new CDbwEvaluator(conf, seqFileDir);
- writer.append("CDbw Inter-Cluster Density: ").append(String.valueOf(cdbw.interClusterDensity())).append("\n");
- writer.append("CDbw Intra-Cluster Density: ").append(String.valueOf(cdbw.intraClusterDensity())).append("\n");
- writer.append("CDbw Separation: ").append(String.valueOf(cdbw.separation())).append("\n");
- writer.flush();
- }
- log.info("Wrote {} clusters", numWritten);
- } finally {
- if (shouldClose) {
- Closeables.close(clusterWriter, false);
- } else {
- if (clusterWriter instanceof GraphMLClusterWriter) {
- clusterWriter.close();
- }
- }
- }
- }
-
- ClusterWriter createClusterWriter(Writer writer, String[] dictionary) throws IOException {
- ClusterWriter result;
-
- switch (outputFormat) {
- case TEXT:
- result = new ClusterDumperWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString);
- break;
- case CSV:
- result = new CSVClusterWriter(writer, clusterIdToPoints, measure);
- break;
- case GRAPH_ML:
- result = new GraphMLClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString);
- break;
- case JSON:
- result = new JsonClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary);
- break;
- default:
- throw new IllegalStateException("Unknown outputformat: " + outputFormat);
- }
- return result;
- }
-
- /**
- * Convenience function to set the output format during testing.
- */
- public void setOutputFormat(OUTPUT_FORMAT of) {
- outputFormat = of;
- }
-
- private void init() {
- if (this.pointsDir != null) {
- Configuration conf = new Configuration();
- // read in the points
- clusterIdToPoints = readPoints(this.pointsDir, maxPointsPerCluster, conf);
- } else {
- clusterIdToPoints = Collections.emptyMap();
- }
- }
-
-
- public int getSubString() {
- return subString;
- }
-
- public void setSubString(int subString) {
- this.subString = subString;
- }
-
- public Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() {
- return clusterIdToPoints;
- }
-
- public String getTermDictionary() {
- return termDictionary;
- }
-
- public void setTermDictionary(String termDictionary, String dictionaryType) {
- this.termDictionary = termDictionary;
- this.dictionaryFormat = dictionaryType;
- }
-
- public void setNumTopFeatures(int num) {
- this.numTopFeatures = num;
- }
-
- public int getNumTopFeatures() {
- return this.numTopFeatures;
- }
-
- public long getMaxPointsPerCluster() {
- return maxPointsPerCluster;
- }
-
- public void setMaxPointsPerCluster(long maxPointsPerCluster) {
- this.maxPointsPerCluster = maxPointsPerCluster;
- }
-
- public static Map<Integer, List<WeightedPropertyVectorWritable>> readPoints(Path pointsPathDir,
- long maxPointsPerCluster,
- Configuration conf) {
- Map<Integer, List<WeightedPropertyVectorWritable>> result = new TreeMap<>();
- for (Pair<IntWritable, WeightedPropertyVectorWritable> record
- : new SequenceFileDirIterable<IntWritable, WeightedPropertyVectorWritable>(pointsPathDir, PathType.LIST,
- PathFilters.logsCRCFilter(), conf)) {
- // value is the cluster id as an int, key is the name/id of the
- // vector, but that doesn't matter because we only care about printing it
- //String clusterId = value.toString();
- int keyValue = record.getFirst().get();
- List<WeightedPropertyVectorWritable> pointList = result.get(keyValue);
- if (pointList == null) {
- pointList = new ArrayList<>();
- result.put(keyValue, pointList);
- }
- if (pointList.size() < maxPointsPerCluster) {
- pointList.add(record.getSecond());
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
deleted file mode 100644
index 31858c4..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import org.apache.hadoop.io.Text;
-import org.apache.mahout.clustering.AbstractCluster;
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.distance.DistanceMeasure;
-
-import java.io.IOException;
-import java.io.Writer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Implements a {@link ClusterWriter} that outputs in the format used by ClusterDumper in Mahout 0.5
- */
-public class ClusterDumperWriter extends AbstractClusterWriter {
-
- private final int subString;
- private final String[] dictionary;
- private final int numTopFeatures;
-
- public ClusterDumperWriter(Writer writer, Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints,
- DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString) {
- super(writer, clusterIdToPoints, measure);
- this.numTopFeatures = numTopFeatures;
- this.dictionary = dictionary;
- this.subString = subString;
- }
-
- @Override
- public void write(ClusterWritable clusterWritable) throws IOException {
- Cluster cluster = clusterWritable.getValue();
- String fmtStr = cluster.asFormatString(dictionary);
- Writer writer = getWriter();
- if (subString > 0 && fmtStr.length() > subString) {
- writer.write(':');
- writer.write(fmtStr, 0, Math.min(subString, fmtStr.length()));
- } else {
- writer.write(fmtStr);
- }
-
- writer.write('\n');
-
- if (dictionary != null) {
- String topTerms = getTopFeatures(clusterWritable.getValue().getCenter(), dictionary, numTopFeatures);
- writer.write("\tTop Terms: ");
- writer.write(topTerms);
- writer.write('\n');
- }
-
- Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints = getClusterIdToPoints();
- List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(clusterWritable.getValue().getId());
- if (points != null) {
- writer.write("\tWeight : [props - optional]: Point:\n\t");
- for (Iterator<WeightedPropertyVectorWritable> iterator = points.iterator(); iterator.hasNext();) {
- WeightedPropertyVectorWritable point = iterator.next();
- writer.write(String.valueOf(point.getWeight()));
- Map<Text,Text> map = point.getProperties();
- // map can be null since empty maps when written are returned as null
- writer.write(" : [");
- if (map != null) {
- for (Map.Entry<Text,Text> entry : map.entrySet()) {
- writer.write(entry.getKey().toString());
- writer.write("=");
- writer.write(entry.getValue().toString());
- }
- }
- writer.write("]");
-
- writer.write(": ");
-
- writer.write(AbstractCluster.formatVector(point.getVector(), dictionary));
- if (iterator.hasNext()) {
- writer.write("\n\t");
- }
- }
- writer.write('\n');
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
deleted file mode 100644
index 70f8f6f..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-
-/**
- * Writes out clusters
- */
-public interface ClusterWriter extends Closeable {
-
- /**
- * Write all values in the Iterable to the output
- *
- * @param iterable The {@link Iterable} to loop over
- * @return the number of docs written
- * @throws java.io.IOException if there was a problem writing
- */
- long write(Iterable<ClusterWritable> iterable) throws IOException;
-
- /**
- * Write out a Cluster
- */
- void write(ClusterWritable clusterWritable) throws IOException;
-
- /**
- * Write the first {@code maxDocs} to the output.
- *
- * @param iterable The {@link Iterable} to loop over
- * @param maxDocs the maximum number of docs to write
- * @return The number of docs written
- * @throws IOException if there was a problem writing
- */
- long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
deleted file mode 100644
index 25e8f3b..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.clustering;
-
-import java.io.IOException;
-import java.io.Writer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.regex.Pattern;
-
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.common.StringUtils;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.math.NamedVector;
-import org.apache.mahout.math.Vector;
-
-/**
- * GraphML -- see http://gephi.org/users/supported-graph-formats/graphml-format/
- */
-public class GraphMLClusterWriter extends AbstractClusterWriter {
-
- private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
- private final Map<Integer, Color> colors = new HashMap<>();
- private Color lastClusterColor;
- private float lastX;
- private float lastY;
- private Random random;
- private int posStep;
- private final String[] dictionary;
- private final int numTopFeatures;
- private final int subString;
-
- public GraphMLClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
- DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString)
- throws IOException {
- super(writer, clusterIdToPoints, measure);
- this.dictionary = dictionary;
- this.numTopFeatures = numTopFeatures;
- this.subString = subString;
- init(writer);
- }
-
- private void init(Writer writer) throws IOException {
- writer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
- writer.append("<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n"
- + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
- + "xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n"
- + "http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">");
- //support rgb
- writer.append("<key attr.name=\"r\" attr.type=\"int\" for=\"node\" id=\"r\"/>\n"
- + "<key attr.name=\"g\" attr.type=\"int\" for=\"node\" id=\"g\"/>\n"
- + "<key attr.name=\"b\" attr.type=\"int\" for=\"node\" id=\"b\"/>"
- + "<key attr.name=\"size\" attr.type=\"int\" for=\"node\" id=\"size\"/>"
- + "<key attr.name=\"weight\" attr.type=\"float\" for=\"edge\" id=\"weight\"/>"
- + "<key attr.name=\"x\" attr.type=\"float\" for=\"node\" id=\"x\"/>"
- + "<key attr.name=\"y\" attr.type=\"float\" for=\"node\" id=\"y\"/>");
- writer.append("<graph edgedefault=\"undirected\">");
- lastClusterColor = new Color();
- posStep = (int) (0.1 * clusterIdToPoints.size()) + 100;
- random = RandomUtils.getRandom();
- }
-
- /*
- <?xml version="1.0" encoding="UTF-8"?>
- <graphml xmlns="http://graphml.graphdrawing.org/xmlns"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns
- http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd">
- <graph id="G" edgedefault="undirected">
- <node id="n0"/>
- <node id="n1"/>
- <edge id="e1" source="n0" target="n1"/>
- </graph>
- </graphml>
- */
-
- @Override
- public void write(ClusterWritable clusterWritable) throws IOException {
- StringBuilder line = new StringBuilder();
- Cluster cluster = clusterWritable.getValue();
- Color rgb = getColor(cluster.getId());
-
- String topTerms = "";
- if (dictionary != null) {
- topTerms = getTopTerms(cluster.getCenter(), dictionary, numTopFeatures);
- }
- String clusterLabel = String.valueOf(cluster.getId()) + '_' + topTerms;
- //do some positioning so that items are visible and grouped together
- //TODO: put in a real layout algorithm
- float x = lastX + 1000;
- float y = lastY;
- if (x > (1000 + posStep)) {
- y = lastY + 1000;
- x = 0;
- }
-
- line.append(createNode(clusterLabel, rgb, x, y));
- List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(cluster.getId());
- if (points != null) {
- for (WeightedVectorWritable point : points) {
- Vector theVec = point.getVector();
- double distance = 1;
- if (measure != null) {
- //scale the distance
- distance = measure.distance(cluster.getCenter().getLengthSquared(), cluster.getCenter(), theVec) * 500;
- }
- String vecStr;
- int angle = random.nextInt(360); //pick an angle at random and then scale along that angle
- double angleRads = Math.toRadians(angle);
-
- float targetX = x + (float) (distance * Math.cos(angleRads));
- float targetY = y + (float) (distance * Math.sin(angleRads));
- if (theVec instanceof NamedVector) {
- vecStr = ((NamedVector) theVec).getName();
- } else {
- vecStr = theVec.asFormatString();
- //do some basic manipulations for display
- vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
- }
- if (subString > 0 && vecStr.length() > subString) {
- vecStr = vecStr.substring(0, subString);
- }
- line.append(createNode(vecStr, rgb, targetX, targetY));
- line.append(createEdge(clusterLabel, vecStr, distance));
- }
- }
- lastClusterColor = rgb;
- lastX = x;
- lastY = y;
- getWriter().append(line).append("\n");
- }
-
- private Color getColor(int clusterId) {
- Color result = colors.get(clusterId);
- if (result == null) {
- result = new Color();
- //there is probably some better way to color a graph
- int incR = 0;
- int incG = 0;
- int incB = 0;
- if (lastClusterColor.r + 20 < 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) {
- incR = 20;
- incG = 0;
- incB = 0;
- } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) {
- incG = 20;
- incB = 0;
- } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 >= 256 && lastClusterColor.b + 20 < 256) {
- incB = 20;
- } else {
- incR += 3;
- incG += 3;
- incR += 3;
- }
- result.r = (lastClusterColor.r + incR) % 256;
- result.g = (lastClusterColor.g + incG) % 256;
- result.b = (lastClusterColor.b + incB) % 256;
- colors.put(clusterId, result);
- }
- return result;
- }
-
- private static String createEdge(String left, String right, double distance) {
- left = StringUtils.escapeXML(left);
- right = StringUtils.escapeXML(right);
- return "<edge id=\"" + left + '_' + right + "\" source=\"" + left + "\" target=\"" + right + "\">"
- + "<data key=\"weight\">" + distance + "</data></edge>";
- }
-
- private static String createNode(String s, Color rgb, float x, float y) {
- return "<node id=\"" + StringUtils.escapeXML(s) + "\"><data key=\"r\">" + rgb.r
- + "</data>"
- + "<data key=\"g\">" + rgb.g
- + "</data>"
- + "<data key=\"b\">" + rgb.b
- + "</data>"
- + "<data key=\"x\">" + x
- + "</data>"
- + "<data key=\"y\">" + y
- + "</data>"
- + "</node>";
- }
-
- @Override
- public void close() throws IOException {
- getWriter().append("</graph>").append("</graphml>");
- super.close();
- }
-
- private static class Color {
- int r;
- int g;
- int b;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
deleted file mode 100644
index d564a73..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.utils.clustering;
-
-import java.io.IOException;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.mahout.clustering.AbstractCluster;
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.math.NamedVector;
-import org.apache.mahout.math.Vector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Dump cluster info to JSON formatted lines. Heavily inspired by
- * ClusterDumperWriter.java and CSVClusterWriter.java
- *
- */
-public class JsonClusterWriter extends AbstractClusterWriter {
- private final String[] dictionary;
- private final int numTopFeatures;
- private final ObjectMapper jxn;
-
- private static final Logger log = LoggerFactory.getLogger(JsonClusterWriter.class);
- private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}");
-
- public JsonClusterWriter(Writer writer,
- Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints,
- DistanceMeasure measure, int numTopFeatures, String[] dictionary) {
- super(writer, clusterIdToPoints, measure);
- this.numTopFeatures = numTopFeatures;
- this.dictionary = dictionary;
- jxn = new ObjectMapper();
- }
-
- /**
- * Generate HashMap with cluster info and write as a single JSON formatted
- * line
- */
- @Override
- public void write(ClusterWritable clusterWritable) throws IOException {
- Map<String, Object> res = new HashMap<>();
-
- // get top terms
- if (dictionary != null) {
- List<Object> topTerms = getTopFeaturesList(clusterWritable.getValue()
- .getCenter(), dictionary, numTopFeatures);
- res.put("top_terms", topTerms);
- } else {
- res.put("top_terms", new ArrayList<>());
- }
-
- // get human-readable cluster representation
- Cluster cluster = clusterWritable.getValue();
- res.put("cluster_id", cluster.getId());
-
- if (dictionary != null) {
- Map<String,Object> fmtStr = cluster.asJson(dictionary);
- res.put("cluster", fmtStr);
-
- // get points
- List<Object> points = getPoints(cluster, dictionary);
- res.put("points", points);
- } else {
- res.put("cluster", new HashMap<>());
- res.put("points", new ArrayList<>());
- }
-
- // write JSON
- Writer writer = getWriter();
- writer.write(jxn.writeValueAsString(res) + "\n");
- }
-
- /**
- * Create a List of HashMaps containing top terms information
- *
- * @return List<Object>
- */
- public List<Object> getTopFeaturesList(Vector vector, String[] dictionary,
- int numTerms) {
-
- List<TermIndexWeight> vectorTerms = new ArrayList<>();
-
- for (Vector.Element elt : vector.nonZeroes()) {
- vectorTerms.add(new TermIndexWeight(elt.index(), elt.get()));
- }
-
- // Sort results in reverse order (i.e. weight in descending order)
- Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() {
- @Override
- public int compare(TermIndexWeight one, TermIndexWeight two) {
- return Double.compare(two.weight, one.weight);
- }
- });
-
- List<Object> topTerms = new ArrayList<>();
-
- for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) {
- int index = vectorTerms.get(i).index;
- String dictTerm = dictionary[index];
- if (dictTerm == null) {
- log.error("Dictionary entry missing for {}", index);
- continue;
- }
- Map<String, Object> term_entry = new HashMap<>();
- term_entry.put(dictTerm, vectorTerms.get(i).weight);
- topTerms.add(term_entry);
- }
-
- return topTerms;
- }
-
- /**
- * Create a List of HashMaps containing Vector point information
- *
- * @return List<Object>
- */
- public List<Object> getPoints(Cluster cluster, String[] dictionary) {
- List<Object> vectorObjs = new ArrayList<>();
- List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get(
- cluster.getId());
-
- if (points != null) {
- for (WeightedPropertyVectorWritable point : points) {
- Map<String, Object> entry = new HashMap<>();
- Vector theVec = point.getVector();
- if (theVec instanceof NamedVector) {
- entry.put("vector_name", ((NamedVector) theVec).getName());
- } else {
- String vecStr = theVec.asFormatString();
- // do some basic manipulations for display
- vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_");
- entry.put("vector_name", vecStr);
- }
- entry.put("weight", String.valueOf(point.getWeight()));
- try {
- entry.put("point",
- AbstractCluster.formatVectorAsJson(point.getVector(), dictionary));
- } catch (IOException e) {
- log.error("IOException: ", e);
- }
- vectorObjs.add(entry);
- }
- }
- return vectorObjs;
- }
-
- /**
- * Convenience class for sorting terms
- *
- */
- private static class TermIndexWeight {
- private final int index;
- private final double weight;
-
- TermIndexWeight(int index, double weight) {
- this.index = index;
- this.weight = weight;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java b/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
deleted file mode 100644
index 54ad43f..0000000
--- a/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.utils.email;
-
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Configuration options to be used by {@link MailProcessor}. Includes options controlling the exact output format
- * and which mail fields are included (body, to, from, subject, etc.)
- */
-public class MailOptions {
-
- public static final String FROM = "FROM";
- public static final String TO = "TO";
- public static final String REFS = "REFS";
- public static final String SUBJECT = "SUBJECT";
- public static final Pattern DEFAULT_QUOTED_TEXT = Pattern.compile("^(\\||>)");
-
- private boolean stripQuotedText;
- private File input;
- private String outputDir;
- private String prefix;
- private int chunkSize;
- private Charset charset;
- private String separator;
- private String bodySeparator = "\n";
- private boolean includeBody;
- private Pattern[] patternsToMatch;
- //maps FROM, TO, REFS, SUBJECT, etc. to the order they appear in patternsToMatch. See MailToRecMapper
- private Map<String, Integer> patternOrder;
-
- //the regular expression to use for identifying quoted text.
- private Pattern quotedTextPattern = DEFAULT_QUOTED_TEXT;
-
- public File getInput() {
- return input;
- }
-
- public void setInput(File input) {
- this.input = input;
- }
-
- public String getOutputDir() {
- return outputDir;
- }
-
- /**
- * Sets the output directory where sequence files will be written.
- */
- public void setOutputDir(String outputDir) {
- this.outputDir = outputDir;
- }
-
- public String getPrefix() {
- return prefix;
- }
-
- /**
- * Sets the prefix that is combined with the archive name and with message ids to create {@code SequenceFile} keys.
- * @param prefix The name of the directory containing the mail archive is commonly used.
- */
- public void setPrefix(String prefix) {
- this.prefix = prefix;
- }
-
- public int getChunkSize() {
- return chunkSize;
- }
-
- /**
- * Sets the size of each generated sequence file, in Megabytes.
- */
- public void setChunkSize(int chunkSize) {
- this.chunkSize = chunkSize;
- }
-
- public Charset getCharset() {
- return charset;
- }
-
- /**
- * Sets the encoding of the input
- */
- public void setCharset(Charset charset) {
- this.charset = charset;
- }
-
- public String getSeparator() {
- return separator;
- }
-
- /**
- * Sets the separator to use in the output between metadata items (to, from, etc.).
- */
- public void setSeparator(String separator) {
- this.separator = separator;
- }
-
- public String getBodySeparator() {
- return bodySeparator;
- }
-
- /**
- * Sets the separator to use in the output between lines in the body, the default is "\n".
- */
- public void setBodySeparator(String bodySeparator) {
- this.bodySeparator = bodySeparator;
- }
-
- public boolean isIncludeBody() {
- return includeBody;
- }
-
- /**
- * Sets whether mail bodies are included in the output
- */
- public void setIncludeBody(boolean includeBody) {
- this.includeBody = includeBody;
- }
-
- public Pattern[] getPatternsToMatch() {
- return patternsToMatch;
- }
-
- /**
- * Sets the list of patterns to be applied in the given order to extract metadata fields (to, from, subject, etc.)
- * from the input
- */
- public void setPatternsToMatch(Pattern[] patternsToMatch) {
- this.patternsToMatch = patternsToMatch;
- }
-
- public Map<String, Integer> getPatternOrder() {
- return patternOrder;
- }
-
- public void setPatternOrder(Map<String, Integer> patternOrder) {
- this.patternOrder = patternOrder;
- }
-
- /**
- *
- * @return true if we should strip out quoted email text
- */
- public boolean isStripQuotedText() {
- return stripQuotedText;
- }
-
- /**
- *
- * Sets whether quoted text such as lines starting with | or > is striped off.
- */
- public void setStripQuotedText(boolean stripQuotedText) {
- this.stripQuotedText = stripQuotedText;
- }
-
- public Pattern getQuotedTextPattern() {
- return quotedTextPattern;
- }
-
- /**
- * Sets the {@link java.util.regex.Pattern} to use to identify lines that are quoted text. Default is | and >
- * @see #setStripQuotedText(boolean)
- */
- public void setQuotedTextPattern(Pattern quotedTextPattern) {
- this.quotedTextPattern = quotedTextPattern;
- }
-}