You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2012/06/11 23:13:13 UTC
svn commit: r1349015 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/
examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/
examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/diri...
Author: jeastman
Date: Mon Jun 11 21:13:13 2012
New Revision: 1349015
URL: http://svn.apache.org/viewvc?rev=1349015&view=rev
Log:
MAHOUT-1017:
- improved logging message in FuzzyKMeansDriver
- converted meanshift InputDriver and InputMapper to emit ClusterWritables used by other jobs
- fixed synthetic control jobs to properly handle the -final suffix on final clusters-i directory
- all synthetic control jobs run
- all unit tests pass
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java
mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Mon Jun 11 21:13:13 2012
@@ -274,7 +274,7 @@ public class FuzzyKMeansDriver extends A
}
if (clusters.isEmpty()) {
- throw new IllegalStateException("No input clusters found. Check your -c argument.");
+ throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
}
Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java Mon Jun 11 21:13:13 2012
@@ -88,7 +88,7 @@ public final class Job extends AbstractJ
output, measure, t1, t2, true, 0.0, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
- "clusters-0"), new Path(output, "clusteredPoints"));
+ "clusters-0-final"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java Mon Jun 11 21:13:13 2012
@@ -38,14 +38,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Job extends AbstractJob {
-
+
private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+
private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
- private Job() {
- }
-
+
+ private Job() {}
+
public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
@@ -54,45 +53,53 @@ public final class Job extends AbstractJ
log.info("Running with default arguments");
Path output = new Path("output");
HadoopUtil.delete(new Configuration(), output);
- DistributionDescription description =
- new DistributionDescription(GaussianClusterDistribution.class.getName(),
- RandomAccessSparseVector.class.getName(),
- null,
- 60);
+ DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
+ RandomAccessSparseVector.class.getName(), null, 60);
run(new Path("testdata"), output, description, 10, 5, 1.0, true, 0);
}
}
-
+
@Override
- public int run(String[] args) throws Exception{
+ public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.numClustersOption().withRequired(true).create());
addOption(DefaultOptionCreator.overwriteOption().create());
- addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.ALPHA_OPTION).withRequired(false)
- .withShortName("m").withArgument(new ArgumentBuilder().withName(DirichletDriver.ALPHA_OPTION).withDefault("1.0")
- .withMinimum(1).withMaximum(1).create())
+ addOption(new DefaultOptionBuilder()
+ .withLongName(DirichletDriver.ALPHA_OPTION)
+ .withRequired(false)
+ .withShortName("m")
+ .withArgument(
+ new ArgumentBuilder().withName(DirichletDriver.ALPHA_OPTION).withDefault("1.0").withMinimum(1)
+ .withMaximum(1).create())
.withDescription("The alpha0 value for the DirichletDistribution. Defaults to 1.0").create());
- addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
- .withRequired(false).withShortName("md").withArgument(new ArgumentBuilder()
- .withName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
- .withDefault(GaussianClusterDistribution.class.getName()).withMinimum(1).withMaximum(1).create())
+ addOption(new DefaultOptionBuilder()
+ .withLongName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
+ .withRequired(false)
+ .withShortName("md")
+ .withArgument(
+ new ArgumentBuilder().withName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
+ .withDefault(GaussianClusterDistribution.class.getName()).withMinimum(1).withMaximum(1).create())
.withDescription("The ModelDistribution class name. Defaults to GaussianClusterDistribution").create());
- addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION).withRequired(false)
- .withShortName("mp").withArgument(new ArgumentBuilder().withName("prototypeClass")
- .withDefault(RandomAccessSparseVector.class.getName()).withMinimum(1).withMaximum(1).create())
+ addOption(new DefaultOptionBuilder()
+ .withLongName(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION)
+ .withRequired(false)
+ .withShortName("mp")
+ .withArgument(
+ new ArgumentBuilder().withName("prototypeClass").withDefault(RandomAccessSparseVector.class.getName())
+ .withMinimum(1).withMaximum(1).create())
.withDescription("The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector")
.create());
addOption(DefaultOptionCreator.distanceMeasureOption().withRequired(false).create());
addOption(DefaultOptionCreator.emitMostLikelyOption().create());
addOption(DefaultOptionCreator.thresholdOption().create());
-
- Map<String, List<String>> argMap = parseArguments(args);
+
+ Map<String,List<String>> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -106,13 +113,12 @@ public final class Job extends AbstractJ
boolean emitMostLikely = Boolean.parseBoolean(getOption(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION));
double threshold = Double.parseDouble(getOption(DefaultOptionCreator.THRESHOLD_OPTION));
double alpha0 = Double.parseDouble(getOption(DirichletDriver.ALPHA_OPTION));
- DistributionDescription description =
- new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, 60);
-
+ DistributionDescription description = new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, 60);
+
run(input, output, description, numModels, maxIterations, alpha0, emitMostLikely, threshold);
return 0;
}
-
+
/**
* Run the job using supplied arguments, deleting the output directory if it exists beforehand
*
@@ -120,7 +126,8 @@ public final class Job extends AbstractJ
* the directory pathname for input points
* @param output
* the directory pathname for output points
- * @param description the model distribution description
+ * @param description
+ * the model distribution description
* @param numModels
* the number of Models
* @param maxIterations
@@ -128,25 +135,18 @@ public final class Job extends AbstractJ
* @param alpha0
* the alpha0 value for the DirichletDistribution
*/
- public static void run(Path input,
- Path output,
- DistributionDescription description,
- int numModels,
- int maxIterations,
- double alpha0,
- boolean emitMostLikely,
- double threshold)
- throws Exception{
+ public static void run(Path input, Path output, DistributionDescription description, int numModels,
+ int maxIterations, double alpha0, boolean emitMostLikely, double threshold) throws Exception {
Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
- DirichletDriver.run(new Configuration(), directoryContainingConvertedInput, output, description, numModels, maxIterations, alpha0, true,
- emitMostLikely, threshold, false);
+ DirichletDriver.run(new Configuration(), directoryContainingConvertedInput, output, description, numModels,
+ maxIterations, alpha0, true, emitMostLikely, threshold, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-" + maxIterations), new Path(output, "clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+ "clusteredPoints"));
clusterDumper.printClusters(null);
}
-
+
/**
* Actually prints out the clusters
*
@@ -156,22 +156,11 @@ public final class Job extends AbstractJ
* the minimum number of samples to enable printing a model
*/
/*
- private static void printClusters(Iterable<List<DirichletCluster>> clusters, int significant) {
- int row = 0;
- StringBuilder result = new StringBuilder(100);
- for (List<DirichletCluster> r : clusters) {
- result.append("sample=").append(row++).append("]= ");
- for (int k = 0; k < r.size(); k++) {
- Model<VectorWritable> model = r.get(k).getModel();
- if (model.count() > significant) {
- int total = (int) r.get(k).getTotalCount();
- result.append('m').append(k).append('(').append(total).append(')').append(model).append(", ");
- }
- }
- result.append('\n');
- }
- result.append('\n');
- log.info(result.toString());
- }
+ * private static void printClusters(Iterable<List<DirichletCluster>> clusters, int significant) { int row = 0;
+ * StringBuilder result = new StringBuilder(100); for (List<DirichletCluster> r : clusters) {
+ * result.append("sample=").append(row++).append("]= "); for (int k = 0; k < r.size(); k++) { Model<VectorWritable>
+ * model = r.get(k).getModel(); if (model.count() > significant) { int total = (int) r.get(k).getTotalCount();
+ * result.append('m').append(k).append('(').append(total).append(')').append(model).append(", "); } }
+ * result.append('\n'); } result.append('\n'); log.info(result.toString()); }
*/
}
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java Mon Jun 11 21:13:13 2012
@@ -17,17 +17,14 @@
package org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
@@ -43,16 +40,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Job extends AbstractJob {
-
+
private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+
private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
+
private static final String M_OPTION = FuzzyKMeansDriver.M_OPTION;
-
- private Job() {
- }
-
+
+ private Job() {}
+
public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
@@ -62,13 +58,12 @@ public final class Job extends AbstractJ
Path output = new Path("output");
Configuration conf = new Configuration();
HadoopUtil.delete(conf, output);
- run(conf, new Path("testdata"), output,
- new EuclideanDistanceMeasure(), 80, 55, 10, (float) 2, 0.5);
+ run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55, 10, (float) 2, 0.5);
}
}
-
+
@Override
- public int run(String[] args) throws Exception{
+ public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
@@ -77,68 +72,43 @@ public final class Job extends AbstractJ
addOption(DefaultOptionCreator.overwriteOption().create());
addOption(DefaultOptionCreator.t1Option().create());
addOption(DefaultOptionCreator.t2Option().create());
- addOption(M_OPTION, M_OPTION,
- "coefficient normalization factor, must be greater than 1", true);
-
- Map<String, List<String>> argMap = parseArguments(args);
+ addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be greater than 1", true);
+
+ Map<String,List<String>> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
- double convergenceDelta = Double
- .parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
- int maxIterations = Integer
- .parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
float fuzziness = Float.parseFloat(getOption(M_OPTION));
-
- addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(
- true).withArgument(
- new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1)
- .create()).withDescription(
- "coefficient normalization factor, must be greater than 1")
- .withShortName(M_OPTION).create());
+
+ addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(true)
+ .withArgument(new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1).create())
+ .withDescription("coefficient normalization factor, must be greater than 1").withShortName(M_OPTION).create());
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
- run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness,
- convergenceDelta);
+ run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness, convergenceDelta);
return 0;
}
-
- /**
- * Return the path to the final iteration's clusters
- */
- private static Path finalClusterPath(Configuration conf, Path output,
- int maxIterations) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- for (int i = maxIterations; i >= 0; i--) {
- Path clusters = new Path(output, "clusters-" + i);
- if (fs.exists(clusters)) {
- return clusters;
- }
- }
- return null;
- }
-
+
/**
- * Run the kmeans clustering job on an input dataset using the given distance
- * measure, t1, t2 and iteration parameters. All output data will be written
- * to the output directory, which will be initially deleted if it exists. The
- * clustered points will reside in the path <output>/clustered-points. By
- * default, the job expects the a file containing synthetic_control.data as
- * obtained from
- * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
- * resides in a directory named "testdata", and writes output to a directory
- * named "output".
+ * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+ * containing synthetic_control.data as obtained from
+ * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+ * and writes output to a directory named "output".
*
* @param input
* the String denoting the input directory path
@@ -155,31 +125,21 @@ public final class Job extends AbstractJ
* @param convergenceDelta
* the double convergence criteria for iterations
*/
- public static void run(Configuration conf,
- Path input,
- Path output,
- DistanceMeasure measure,
- double t1,
- double t2,
- int maxIterations,
- float fuzziness,
- double convergenceDelta)
- throws Exception{
- Path directoryContainingConvertedInput = new Path(output,
- DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+ int maxIterations, float fuzziness, double convergenceDelta) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
- InputDriver.runJob(input, directoryContainingConvertedInput,
- "org.apache.mahout.math.RandomAccessSparseVector");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running Canopy to get initial clusters");
- CanopyDriver.run(new Configuration(), directoryContainingConvertedInput,
- output, measure, t1, t2, false, 0.0, false);
+ Path canopyOutput = new Path(output, "canopies");
+ CanopyDriver
+ .run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, false);
log.info("Running FuzzyKMeans");
- FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(output,
- Cluster.INITIAL_CLUSTERS_DIR), output, measure, convergenceDelta,
- maxIterations, fuzziness, true, true, 0.0, false);
+ FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output, measure,
+ convergenceDelta, maxIterations, fuzziness, true, true, 0.0, false);
// run ClusterDumper
- ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
- output, maxIterations), new Path(output, "clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+ "clusteredPoints"));
clusterDumper.printClusters(null);
}
}
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java Mon Jun 11 21:13:13 2012
@@ -17,12 +17,10 @@
package org.apache.mahout.clustering.syntheticcontrol.kmeans;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
@@ -42,14 +40,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Job extends AbstractJob {
-
+
private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+
private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
- private Job() {
- }
-
+
+ private Job() {}
+
public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
@@ -62,9 +59,9 @@ public final class Job extends AbstractJ
run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 6, 0.5, 10);
}
}
-
+
@Override
- public int run(String[] args) throws Exception{
+ public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
@@ -74,47 +71,41 @@ public final class Job extends AbstractJ
addOption(DefaultOptionCreator.convergenceOption().create());
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.overwriteOption().create());
-
- Map<String, List<String>> argMap = parseArguments(args);
+
+ Map<String,List<String>> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
- double convergenceDelta = Double
- .parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
- int maxIterations = Integer
- .parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
- int k = Integer
- .parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+ int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
run(getConf(), input, output, measure, k, convergenceDelta, maxIterations);
} else {
double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
- run(getConf(), input, output, measure, t1, t2, convergenceDelta,
- maxIterations);
+ run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations);
}
return 0;
}
-
+
/**
- * Run the kmeans clustering job on an input dataset using the given the
- * number of clusters k and iteration parameters. All output data will be
- * written to the output directory, which will be initially deleted if it
- * exists. The clustered points will reside in the path
- * <output>/clustered-points. By default, the job expects a file containing
- * equal length space delimited data that resides in a directory named
- * "testdata", and writes output to a directory named "output".
+ * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file
+ * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a
+ * directory named "output".
*
* @param conf
* the Configuration to use
@@ -131,37 +122,30 @@ public final class Job extends AbstractJ
* @param maxIterations
* the int maximum number of iterations
*/
- public static void run(Configuration conf, Path input, Path output,
- DistanceMeasure measure, int k, double convergenceDelta, int maxIterations)
- throws Exception{
- Path directoryContainingConvertedInput = new Path(output,
- DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
+ double convergenceDelta, int maxIterations) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
- InputDriver.runJob(input, directoryContainingConvertedInput,
- "org.apache.mahout.math.RandomAccessSparseVector");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running random seed to get initial clusters");
Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
- clusters = RandomSeedGenerator.buildRandom(conf,
- directoryContainingConvertedInput, clusters, k, measure);
+ clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
log.info("Running KMeans");
- KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output,
- measure, convergenceDelta, maxIterations, true, 0.0, false);
+ KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta,
+ maxIterations, true, 0.0, false);
// run ClusterDumper
- ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
- output, maxIterations), new Path(output, "clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+ "clusteredPoints"));
clusterDumper.printClusters(null);
}
-
+
/**
- * Run the kmeans clustering job on an input dataset using the given distance
- * measure, t1, t2 and iteration parameters. All output data will be written
- * to the output directory, which will be initially deleted if it exists. The
- * clustered points will reside in the path <output>/clustered-points. By
- * default, the job expects the a file containing synthetic_control.data as
- * obtained from
- * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
- * resides in a directory named "testdata", and writes output to a directory
- * named "output".
+ * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+ * containing synthetic_control.data as obtained from
+ * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+ * and writes output to a directory named "output".
*
* @param conf
* the Configuration to use
@@ -180,40 +164,21 @@ public final class Job extends AbstractJ
* @param maxIterations
* the int maximum number of iterations
*/
- public static void run(Configuration conf, Path input, Path output,
- DistanceMeasure measure, double t1, double t2, double convergenceDelta,
- int maxIterations)
- throws Exception{
- Path directoryContainingConvertedInput = new Path(output,
- DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+ double convergenceDelta, int maxIterations) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
- InputDriver.runJob(input, directoryContainingConvertedInput,
- "org.apache.mahout.math.RandomAccessSparseVector");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running Canopy to get initial clusters");
- CanopyDriver.run(conf, directoryContainingConvertedInput, output, measure,
- t1, t2, false, 0.0, false);
+ Path canopyOutput = new Path(output, "canopies");
+ CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0,
+ false);
log.info("Running KMeans");
- KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(output,
- Cluster.INITIAL_CLUSTERS_DIR), output, measure, convergenceDelta,
- maxIterations, true, 0.0, false);
+ KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR
+ + "-final"), output, measure, convergenceDelta, maxIterations, true, 0.0, false);
// run ClusterDumper
- ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
- output, maxIterations), new Path(output, "clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+ "clusteredPoints"));
clusterDumper.printClusters(null);
}
-
- /**
- * Return the path to the final iteration's clusters
- */
- private static Path finalClusterPath(Configuration conf, Path output,
- int maxIterations) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- for (int i = maxIterations; i >= 0; i--) {
- Path clusters = new Path(output, "clusters-" + i);
- if (fs.exists(clusters)) {
- return clusters;
- }
- }
- return null;
- }
}
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java Mon Jun 11 21:13:13 2012
@@ -142,7 +142,7 @@ public final class Job extends AbstractJ
true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
- "clusters-" + maxIterations), new Path(output, "clusteredPoints"));
+ "clusters-*-final"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java Mon Jun 11 21:13:13 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.Reduc
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.slf4j.Logger;
@@ -82,7 +82,7 @@ public final class InputDriver {
Job job = new Job(conf, "Mean Shift Input Driver running over input: " + input);
job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(MeanShiftCanopy.class);
+ job.setOutputValueClass(ClusterWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(org.apache.mahout.clustering.conversion.meanshift.InputMapper.class);
job.setReducerClass(Reducer.class);
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java Mon Jun 11 21:13:13 2012
@@ -21,21 +21,25 @@ import java.io.IOException;
import java.util.Collection;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
-public class InputMapper extends Mapper<LongWritable, Text, Text, MeanShiftCanopy> {
+import com.google.common.collect.Lists;
+public class InputMapper extends Mapper<LongWritable,Text,Text,ClusterWritable> {
+
private static final Pattern SPACE = Pattern.compile(" ");
-
+
private int nextCanopyId;
-
+
+ private ClusterWritable cw = new ClusterWritable();
+
@Override
protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] numbers = SPACE.split(values.toString());
@@ -53,8 +57,8 @@ public class InputMapper extends Mapper<
for (Double d : doubles) {
point.set(index++, d);
}
- MeanShiftCanopy canopy = new MeanShiftCanopy(point, nextCanopyId++, new EuclideanDistanceMeasure());
- context.write(new Text(), canopy);
+ cw.setValue(new MeanShiftCanopy(point, nextCanopyId++, new EuclideanDistanceMeasure()));
+ context.write(new Text(), cw);
}
}
}