You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2012/05/10 00:02:52 UTC
svn commit: r1336424 [1/2] - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/classify/
core/src/main/java/org/apache/mahout/clustering/dirichlet/
core/src/main/java/org/apache/mahout/clustering/dirichlet/models/
core/src/main/java/org...
Author: jeastman
Date: Wed May 9 22:02:50 2012
New Revision: 1336424
URL: http://svn.apache.org/viewvc?rev=1336424&view=rev
Log:
MAHOUT-990: fixed problems with patch and all tests and displays run
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletState.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/dirichlet/TestL1ModelClustering.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Wed May 9 22:02:50 2012
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.dirichlet.DirichletCluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.clustering.iterator.ClusteringPolicy;
import org.apache.mahout.common.AbstractJob;
@@ -166,9 +165,7 @@ public class ClusterClassificationDriver
while (it.hasNext()) {
ClusterWritable next = (ClusterWritable) it.next();
cluster = (Cluster) next.getValue();
- if (cluster instanceof DirichletCluster) {
- ((DirichletCluster) cluster).getModel().configure(conf);
- }
+ cluster.configure(conf);
clusterModels.add(cluster);
}
return clusterModels;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java Wed May 9 22:02:50 2012
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.dirichlet.DirichletCluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.clustering.iterator.ClusteringPolicy;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -133,9 +132,7 @@ public class ClusterClassificationMapper
while (it.hasNext()) {
ClusterWritable next = (ClusterWritable) it.next();
cluster = next.getValue();
- if(cluster instanceof DirichletCluster){
- ((DirichletCluster) cluster).getModel().configure(conf);
- }
+ cluster.configure(conf);
clusters.add(cluster);
}
return clusters;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java Wed May 9 22:02:50 2012
@@ -202,12 +202,14 @@ public class ClusterClassifier extends A
}
}
- public void readFromSeqFiles(Path path) throws IOException {
+ public void readFromSeqFiles(Configuration conf, Path path) throws IOException {
Configuration config = new Configuration();
List<Cluster> clusters = Lists.newArrayList();
for (ClusterWritable cw : new SequenceFileDirValueIterable<ClusterWritable>(path, PathType.LIST,
PathFilters.logsCRCFilter(), config)) {
- clusters.add(cw.getValue());
+ Cluster cluster = cw.getValue();
+ cluster.configure(conf);
+ clusters.add(cluster);
}
this.models = clusters;
modelClass = models.get(0).getClass().getName();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java Wed May 9 22:02:50 2012
@@ -26,37 +26,28 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.Model;
+import org.apache.mahout.clustering.ModelDistribution;
import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
import org.apache.mahout.clustering.classify.ClusterClassifier;
import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.ClusterIterator;
import org.apache.mahout.clustering.iterator.DirichletClusteringPolicy;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VectorWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.io.Closeables;
+import com.google.common.collect.Lists;
public class DirichletDriver extends AbstractJob {
-
+
public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
public static final String MODEL_DISTRIBUTION_KEY = "org.apache.mahout.clustering.dirichlet.modelFactory";
public static final String NUM_CLUSTERS_KEY = "org.apache.mahout.clustering.dirichlet.numClusters";
@@ -66,13 +57,11 @@ public class DirichletDriver extends Abs
public static final String MODEL_PROTOTYPE_CLASS_OPTION = "modelPrototype";
public static final String MODEL_DISTRIBUTION_CLASS_OPTION = "modelDist";
public static final String ALPHA_OPTION = "alpha";
-
- private static final Logger log = LoggerFactory.getLogger(DirichletDriver.class);
-
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new DirichletDriver(), args);
}
-
+
@Override
public int run(String[] args) throws Exception {
addInputOption();
@@ -82,23 +71,21 @@ public class DirichletDriver extends Abs
addOption(DefaultOptionCreator.overwriteOption().create());
addOption(DefaultOptionCreator.clusteringOption().create());
addOption(ALPHA_OPTION, "a0", "The alpha0 value for the DirichletDistribution. Defaults to 1.0", "1.0");
- addOption(MODEL_DISTRIBUTION_CLASS_OPTION,
- "md",
- "The ModelDistribution class name. Defaults to GaussianClusterDistribution",
- GaussianClusterDistribution.class.getName());
- addOption(MODEL_PROTOTYPE_CLASS_OPTION,
- "mp",
- "The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector",
- RandomAccessSparseVector.class.getName());
+ addOption(MODEL_DISTRIBUTION_CLASS_OPTION, "md",
+ "The ModelDistribution class name. Defaults to GaussianClusterDistribution",
+ GaussianClusterDistribution.class.getName());
+ addOption(MODEL_PROTOTYPE_CLASS_OPTION, "mp",
+ "The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector",
+ RandomAccessSparseVector.class.getName());
addOption(DefaultOptionCreator.distanceMeasureOption().withRequired(false).create());
addOption(DefaultOptionCreator.emitMostLikelyOption().create());
addOption(DefaultOptionCreator.thresholdOption().create());
addOption(DefaultOptionCreator.methodOption().create());
-
+
if (parseArguments(args) == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -113,30 +100,21 @@ public class DirichletDriver extends Abs
double threshold = Double.parseDouble(getOption(DefaultOptionCreator.THRESHOLD_OPTION));
double alpha0 = Double.parseDouble(getOption(ALPHA_OPTION));
boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
- boolean runSequential =
- getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+ boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
int prototypeSize = readPrototypeSize(input);
-
- DistributionDescription description =
- new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, prototypeSize);
-
- run(getConf(),
- input,
- output,
- description,
- numModels,
- maxIterations,
- alpha0,
- runClustering,
- emitMostLikely,
- threshold,
- runSequential);
+
+ DistributionDescription description = new DistributionDescription(modelFactory, modelPrototype, distanceMeasure,
+ prototypeSize);
+
+ run(getConf(), input, output, description, numModels, maxIterations, alpha0, runClustering, emitMostLikely,
+ threshold, runSequential);
return 0;
}
-
+
/**
- * Iterate over the input vectors to produce clusters and, if requested, use the
- * results of the final iteration to cluster the input vectors.
+ * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+ * cluster the input vectors.
*
* @param conf
* the Configuration to use
@@ -144,107 +122,30 @@ public class DirichletDriver extends Abs
* the directory Path for input points
* @param output
* the directory Path for output points
- * @param description model distribution parameters
+ * @param description
+ * model distribution parameters
* @param maxIterations
* the maximum number of iterations
* @param alpha0
* the alpha_0 value for the DirichletDistribution
- * @param runClustering
+ * @param runClustering
* true if clustering of points to be done after iterations
* @param emitMostLikely
* a boolean if true emit only most likely cluster for each point
- * @param threshold
+ * @param threshold
* a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
- * @param runSequential execute sequentially if true
+ * @param runSequential
+ * execute sequentially if true
*/
- public static void run(Configuration conf,
- Path input,
- Path output,
- DistributionDescription description,
- int numModels,
- int maxIterations,
- double alpha0,
- boolean runClustering,
- boolean emitMostLikely,
- double threshold,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException {
- Path clustersOut =
- buildClusters(conf, input, output, description, numModels, maxIterations, alpha0, runSequential);
+ public static void run(Configuration conf, Path input, Path output, DistributionDescription description,
+ int numModels, int maxIterations, double alpha0, boolean runClustering, boolean emitMostLikely, double threshold,
+ boolean runSequential) throws IOException, ClassNotFoundException, InterruptedException {
+ Path clustersOut = buildClusters(conf, input, output, description, numModels, maxIterations, alpha0, runSequential);
if (runClustering) {
- clusterData(conf,
- input,
- clustersOut,
- output,
- alpha0,
- numModels,
- emitMostLikely,
- threshold,
- runSequential);
+ clusterData(conf, input, clustersOut, output, alpha0, numModels, emitMostLikely, threshold, runSequential);
}
}
-
- /**
- * Convenience method provides default Configuration
- * Iterate over the input vectors to produce clusters and, if requested, use the
- * results of the final iteration to cluster the input vectors.
- *
- * @param input
- * the directory Path for input points
- * @param output
- * the directory Path for output points
- * @param description model distribution parameters
- * @param numClusters
- * the number of models to iterate over
- * @param maxIterations
- * the maximum number of iterations
- * @param alpha0
- * the alpha_0 value for the DirichletDistribution
- * @param runClustering
- * true if clustering of points to be done after iterations
- * @param emitMostLikely
- * a boolean if true emit only most likely cluster for each point
- * @param threshold
- * a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
- * @param runSequential execute sequentially if true
- */
- public static void run(Path input,
- Path output,
- DistributionDescription description,
- int numClusters,
- int maxIterations,
- double alpha0,
- boolean runClustering,
- boolean emitMostLikely,
- double threshold,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException {
- run(new Configuration(),
- input,
- output,
- description,
- numClusters,
- maxIterations,
- alpha0,
- runClustering,
- emitMostLikely,
- threshold,
- runSequential);
- }
-
- /**
- * Creates a DirichletState object from the given arguments. Note that the modelFactory is presumed to be a
- * subclass of VectorModelDistribution that can be initialized with a concrete Vector prototype.
- *
- * @param description model distribution parameters
- * @param numModels an int number of models to be created
- * @param alpha0 the double alpha_0 argument to the algorithm
- * @return an initialized DirichletState
- */
- static DirichletState createState(DistributionDescription description, int numModels, double alpha0) {
- return new DirichletState(description, numModels, alpha0);
- }
-
+
/**
* Read the first input vector to determine the prototype size for the modelPrototype
*/
@@ -261,223 +162,79 @@ public class DirichletDriver extends Abs
}
return protoSize;
}
-
- /**
- * Write initial state (prior distribution) to the output path directory
- * @param output the output Path
- * @param stateOut the state output Path
- * @param description model distribution parameters
- * @param numModels the int number of models to generate
- * @param alpha0 the double alpha_0 argument to the DirichletDistribution
- */
- private static void writeInitialState(Path output,
- Path stateOut,
- DistributionDescription description,
- int numModels,
- double alpha0) throws IOException {
-
- DirichletState state = createState(description, numModels, alpha0);
- writeState(output, stateOut, numModels, state);
- }
-
- private static void writeState(Path output, Path stateOut, int numModels, DirichletState state) throws IOException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(output.toUri(), conf);
- ClusterWritable clusterWritable = new ClusterWritable();
- for (int i = 0; i < numModels; i++) {
- Path path = new Path(stateOut, "part-" + i);
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, ClusterWritable.class);
- try {
- DirichletCluster dirichletCluster = state.getClusters().get(i);
- clusterWritable.setValue(dirichletCluster);
- writer.append(new Text(Integer.toString(i)), clusterWritable);
- } finally {
- Closeables.closeQuietly(writer);
- }
- }
- }
-
- /**
- * Run an iteration using supplied arguments
- * @param conf
- * @param input the directory pathname for input points
- * @param stateIn the directory pathname for input state
- * @param stateOut the directory pathname for output state
- * @param description model distribution parameters
- * @param numClusters the number of clusters
- * @param alpha0 alpha_0
- */
- private static void runIteration(Configuration conf,
- Path input,
- Path stateIn,
- Path stateOut,
- DistributionDescription description,
- int numClusters,
- double alpha0) throws IOException, InterruptedException, ClassNotFoundException {
- conf.set(STATE_IN_KEY, stateIn.toString());
- conf.set(MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(NUM_CLUSTERS_KEY, Integer.toString(numClusters));
- conf.set(ALPHA_0_KEY, Double.toString(alpha0));
-
- Job job = new Job(conf, "Dirichlet Driver running runIteration over stateIn: " + stateIn);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(ClusterWritable.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(VectorWritable.class);
- job.setMapperClass(DirichletMapper.class);
- job.setReducerClass(DirichletReducer.class);
- job.setJarByClass(DirichletDriver.class);
-
- FileInputFormat.addInputPath(job, input);
- FileOutputFormat.setOutputPath(job, stateOut);
-
- if (!job.waitForCompletion(true)) {
- throw new InterruptedException("Dirichlet Iteration failed processing " + stateIn);
- }
- }
-
+
/**
* Iterate over the input vectors to produce cluster directories for each iteration
- * @param conf
+ *
+ * @param conf
+ * the hadoop configuration
* @param input
* the directory Path for input points
* @param output
* the directory Path for output points
- * @param description model distribution parameters
+ * @param description
+ * model distribution parameters
* @param numClusters
* the number of models to iterate over
* @param maxIterations
* the maximum number of iterations
* @param alpha0
* the alpha_0 value for the DirichletDistribution
- * @param runSequential execute sequentially if true
+ * @param runSequential
+ * execute sequentially if true
*
* @return the Path of the final clusters directory
*/
- public static Path buildClusters(Configuration conf,
- Path input,
- Path output,
- DistributionDescription description,
- int numClusters,
- int maxIterations,
- double alpha0,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException {
+ public static Path buildClusters(Configuration conf, Path input, Path output, DistributionDescription description,
+ int numClusters, int maxIterations, double alpha0, boolean runSequential) throws IOException,
+ ClassNotFoundException, InterruptedException {
Path clustersIn = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
- writeInitialState(output, clustersIn, description, numClusters, alpha0);
-
+ ModelDistribution<VectorWritable> modelDist = description.createModelDistribution(conf);
+
+ List<Cluster> models = Lists.newArrayList();
+ for (Model<VectorWritable> cluster : modelDist.sampleFromPrior(numClusters)) {
+ models.add((Cluster) cluster);
+ }
+
+ ClusterClassifier prior = new ClusterClassifier(models, new DirichletClusteringPolicy(numClusters, alpha0));
+ prior.writeToSeqFiles(clustersIn);
+
if (runSequential) {
- clustersIn = buildClustersSeq(conf, input, output, description, numClusters, maxIterations, alpha0, clustersIn);
+ new ClusterIterator().iterateSeq(conf, input, clustersIn, output, maxIterations);
} else {
- clustersIn = buildClustersMR(conf, input, output, description, numClusters, maxIterations, alpha0, clustersIn);
+ new ClusterIterator().iterateMR(conf, input, clustersIn, output, maxIterations);
}
- return clustersIn;
+ return output;
+
}
-
- private static Path buildClustersSeq(Configuration conf,
- Path input,
- Path output,
- DistributionDescription description,
- int numClusters,
- int maxIterations,
- double alpha0,
- Path clustersIn) throws IOException {
- int iteration = 1;
- while (iteration <= maxIterations) {
- log.info("Iteration {}", iteration);
- // point the output to a new directory per iteration
- Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
- DirichletState state = DirichletMapper.loadState(conf,
- clustersIn.toString(),
- description,
- alpha0,
- numClusters);
-
- List<DirichletCluster> oldModels = state.getClusters();
- for (DirichletCluster oldModel : oldModels) {
- oldModel.getModel().configure(conf);
- }
- Cluster[] newModels = (Cluster[]) state.getModelFactory().sampleFromPosterior(state.getModels());
- for (Cluster newModel : newModels) {
- newModel.configure(conf);
- }
- DirichletClusterer clusterer = new DirichletClusterer(state);
- for (VectorWritable value
- : new SequenceFileDirValueIterable<VectorWritable>(input,
- PathType.LIST,
- PathFilters.logsCRCFilter(),
- conf)) {
- clusterer.observe(newModels, value);
- }
- clusterer.updateModels(newModels);
- writeState(output, clustersOut, numClusters, state);
-
- // now point the input to the old output directory
- clustersIn = clustersOut;
- iteration++;
- }
- Path fromPath = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1));
- Path finalClustersIn = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1) + Cluster.FINAL_ITERATION_SUFFIX);
- FileSystem.get(fromPath.toUri(), conf).rename(fromPath, finalClustersIn);
- return finalClustersIn;
- }
-
- private static Path buildClustersMR(Configuration conf,
- Path input,
- Path output,
- DistributionDescription description,
- int numClusters,
- int maxIterations,
- double alpha0,
- Path clustersIn)
- throws IOException, InterruptedException, ClassNotFoundException {
- int iteration = 1;
- while (iteration <= maxIterations) {
- log.info("Iteration {}", iteration);
- // point the output to a new directory per iteration
- Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
- runIteration(conf, input, clustersIn, clustersOut, description, numClusters, alpha0);
- // now point the input to the old output directory
- clustersIn = clustersOut;
- iteration++;
- }
- Path fromPath = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1));
- Path finalClustersIn = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1) + Cluster.FINAL_ITERATION_SUFFIX);
- FileSystem.get(fromPath.toUri(), conf).rename(fromPath, finalClustersIn);
- return finalClustersIn;
- }
-
+
/**
* Run the job using supplied arguments
- * @param conf
- * @param input
+ *
+ * @param conf
+ * @param input
* the directory pathname for input points
- * @param stateIn
+ * @param stateIn
* the directory pathname for input state
- * @param output
+ * @param output
* the directory pathname for output points
- * @param alpha0 TODO
- * @param numModels TODO
- * @param emitMostLikely
+ * @param alpha0
+ * TODO
+ * @param numModels
+ * TODO
+ * @param emitMostLikely
* a boolean if true emit only most likely cluster for each point
- * @param threshold
+ * @param threshold
* a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
- * @param runSequential execute sequentially if true
+ * @param runSequential
+ * execute sequentially if true
*/
- public static void clusterData(Configuration conf,
- Path input,
- Path stateIn,
- Path output,
- double alpha0,
- int numModels,
- boolean emitMostLikely,
- double threshold,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException {
- ClusterClassifier.writePolicy(new DirichletClusteringPolicy(numModels, alpha0), stateIn);
- ClusterClassificationDriver.run(conf, input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold, emitMostLikely, runSequential);
- }
+ public static void clusterData(Configuration conf, Path input, Path stateIn, Path output, double alpha0,
+ int numModels, boolean emitMostLikely, double threshold, boolean runSequential) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ ClusterClassifier.writePolicy(new DirichletClusteringPolicy(numModels, alpha0), stateIn);
+ ClusterClassificationDriver.run(conf, input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold,
+ emitMostLikely, runSequential);
+ }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java Wed May 9 22:02:50 2012
@@ -21,64 +21,67 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
-import com.google.common.base.Splitter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.mahout.clustering.ModelDistribution;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
+import com.google.common.base.Splitter;
+
/**
* Simply describes parameters needs to create a {@link org.apache.mahout.clustering.ModelDistribution}.
*/
public final class DistributionDescription {
-
+
private final String modelFactory;
private final String modelPrototype;
private final String distanceMeasure;
private final int prototypeSize;
-
- public DistributionDescription(String modelFactory,
- String modelPrototype,
- String distanceMeasure,
- int prototypeSize) {
+
+ public DistributionDescription(String modelFactory, String modelPrototype, String distanceMeasure, int prototypeSize) {
this.modelFactory = modelFactory;
this.modelPrototype = modelPrototype;
this.distanceMeasure = distanceMeasure;
this.prototypeSize = prototypeSize;
}
-
+
public String getModelFactory() {
return modelFactory;
}
-
+
public String getModelPrototype() {
return modelPrototype;
}
-
+
public String getDistanceMeasure() {
return distanceMeasure;
}
-
+
public int getPrototypeSize() {
return prototypeSize;
}
-
+
/**
* Create an instance of AbstractVectorModelDistribution from the given command line arguments
+ *
+ * @param conf
+ * the Configuration
*/
- public ModelDistribution<VectorWritable> createModelDistribution() {
+ public ModelDistribution<VectorWritable> createModelDistribution(Configuration conf) {
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
AbstractVectorModelDistribution modelDistribution;
try {
modelDistribution = ClassUtils.instantiateAs(modelFactory, AbstractVectorModelDistribution.class);
-
+
Class<? extends Vector> vcl = ccl.loadClass(modelPrototype).asSubclass(Vector.class);
Constructor<? extends Vector> v = vcl.getConstructor(int.class);
modelDistribution.setModelPrototype(new VectorWritable(v.newInstance(prototypeSize)));
-
+
if (modelDistribution instanceof DistanceMeasureClusterDistribution) {
DistanceMeasure measure = ClassUtils.instantiateAs(distanceMeasure, DistanceMeasure.class);
+ measure.configure(conf);
((DistanceMeasureClusterDistribution) modelDistribution).setMeasure(measure);
}
} catch (ClassNotFoundException cnfe) {
@@ -94,12 +97,12 @@ public final class DistributionDescripti
}
return modelDistribution;
}
-
+
@Override
public String toString() {
return modelFactory + ',' + modelPrototype + ',' + distanceMeasure + ',' + prototypeSize;
}
-
+
public static DistributionDescription fromString(CharSequence s) {
Iterator<String> tokens = Splitter.on(',').split(s).iterator();
String modelFactory = tokens.next();
@@ -108,5 +111,5 @@ public final class DistributionDescripti
int prototypeSize = Integer.parseInt(tokens.next());
return new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, prototypeSize);
}
-
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Wed May 9 22:02:50 2012
@@ -24,13 +24,13 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
import org.apache.mahout.clustering.classify.ClusterClassifier;
import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.common.AbstractJob;
@@ -239,7 +239,7 @@ public class FuzzyKMeansDriver extends A
* @param input
* the directory pathname for input points
* @param clustersIn
- * the directory pathname for initial & computed clusters
+ * the file pathname for initial cluster centers
* @param output
* the directory pathname for output points
* @param measure
@@ -274,26 +274,18 @@ public class FuzzyKMeansDriver extends A
}
if (clusters.isEmpty()) {
- throw new IllegalStateException("Clusters is empty!");
- }
-
- Path priorClustersPath = new Path(clustersIn, "clusters-0");
-
- FileSystem fileSystem = clustersIn.getFileSystem(conf);
-
- if(fileSystem.isFile(clustersIn)){
- priorClustersPath = new Path(clustersIn.getParent(), "prior");
- fileSystem.mkdirs(priorClustersPath);
+ throw new IllegalStateException("No input clusters found. Check your -c argument.");
}
- FuzzyKMeansClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(m, convergenceDelta);
+ Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
+ ClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(m, convergenceDelta);
ClusterClassifier prior = new ClusterClassifier(clusters, policy);
prior.writeToSeqFiles(priorClustersPath);
if (runSequential) {
- new ClusterIterator().iterateSeq(input, priorClustersPath, output, maxIterations);
+ new ClusterIterator().iterateSeq(conf, input, priorClustersPath, output, maxIterations);
} else {
- new ClusterIterator().iterateMR(input, priorClustersPath, output, maxIterations);
+ new ClusterIterator().iterateMR(conf, input, priorClustersPath, output, maxIterations);
}
return output;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java Wed May 9 22:02:50 2012
@@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
@@ -29,9 +30,10 @@ public class CIMapper extends Mapper<Wri
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
- String priorClustersPath = context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+ Configuration conf = context.getConfiguration();
+ String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
classifier = new ClusterClassifier();
- classifier.readFromSeqFiles(new Path(priorClustersPath));
+ classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
policy = classifier.getPolicy();
policy.update(classifier);
super.setup(context);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java Wed May 9 22:02:50 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
@@ -62,9 +63,10 @@ public class CIReducer extends Reducer<I
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
- String priorClustersPath = context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+ Configuration conf = context.getConfiguration();
+ String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
classifier = new ClusterClassifier();
- classifier.readFromSeqFiles(new Path(priorClustersPath));
+ classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
policy = classifier.getPolicy();
policy.update(classifier);
super.setup(context);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java Wed May 9 22:02:50 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.classify.ClusterClassifier;
-import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
@@ -91,6 +90,8 @@ public class ClusterIterator {
* Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a sequential
* implementation
*
+ * @param conf
+ * the Configuration
* @param inPath
* a Path to input VectorWritables
* @param priorPath
@@ -102,11 +103,10 @@ public class ClusterIterator {
*
* @throws IOException
*/
- public void iterateSeq(Path inPath, Path priorPath, Path outPath, int numIterations) throws IOException {
+ public void iterateSeq(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
+ throws IOException {
ClusterClassifier classifier = new ClusterClassifier();
- classifier.readFromSeqFiles(priorPath);
- Configuration conf = new Configuration();
- HadoopUtil.delete(conf, outPath);
+ classifier.readFromSeqFiles(conf, priorPath);
Path clustersOut = null;
int iteration = 1;
while (iteration <= numIterations) {
@@ -144,6 +144,8 @@ public class ClusterIterator {
* Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
* implementation
*
+ * @param conf
+ * the Configuration
* @param inPath
* a Path to input VectorWritables
* @param priorPath
@@ -153,10 +155,8 @@ public class ClusterIterator {
* @param numIterations
* the int number of iterations to perform
*/
- public void iterateMR(Path inPath, Path priorPath, Path outPath, int numIterations) throws IOException,
- InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
- HadoopUtil.delete(conf, outPath);
+ public void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
+ throws IOException, InterruptedException, ClassNotFoundException {
ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
Path clustersOut = null;
int iteration = 1;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Wed May 9 22:02:50 2012
@@ -23,13 +23,13 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
import org.apache.mahout.clustering.classify.ClusterClassifier;
import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
import org.apache.mahout.clustering.iterator.KMeansClusteringPolicy;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
@@ -42,38 +42,41 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KMeansDriver extends AbstractJob {
-
+
private static final Logger log = LoggerFactory.getLogger(KMeansDriver.class);
-
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new KMeansDriver(), args);
}
-
+
@Override
public int run(String[] args) throws Exception {
-
+
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
- addOption(DefaultOptionCreator.clustersInOption()
- .withDescription("The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy. "
- + "If k is also specified, then a random set of vectors will be selected"
- + " and written out to this path first")
- .create());
- addOption(DefaultOptionCreator.numClustersOption()
- .withDescription("The k in k-Means. If specified, then a random selection of k Vectors will be chosen"
- + " as the Centroid and written to the clusters input path.").create());
+ addOption(DefaultOptionCreator
+ .clustersInOption()
+ .withDescription(
+ "The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy. "
+ + "If k is also specified, then a random set of vectors will be selected"
+ + " and written out to this path first").create());
+ addOption(DefaultOptionCreator
+ .numClustersOption()
+ .withDescription(
+ "The k in k-Means. If specified, then a random selection of k Vectors will be chosen"
+ + " as the Centroid and written to the clusters input path.").create());
addOption(DefaultOptionCreator.convergenceOption().create());
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.overwriteOption().create());
addOption(DefaultOptionCreator.clusteringOption().create());
addOption(DefaultOptionCreator.methodOption().create());
addOption(DefaultOptionCreator.outlierThresholdOption().create());
-
+
if (parseArguments(args) == null) {
return -1;
}
-
+
Path input = getInputPath();
Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
Path output = getOutputPath();
@@ -87,10 +90,10 @@ public class KMeansDriver extends Abstra
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
-
+
if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
- clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, Integer
- .parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
+ clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters,
+ Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
}
boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
@@ -106,10 +109,10 @@ public class KMeansDriver extends Abstra
clusterClassificationThreshold, runSequential);
return 0;
}
-
- /**
- * Iterate over the input vectors to produce clusters and, if requested, use
- * the results of the final iteration to cluster the input vectors.
+
+ /**
+ * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+ * cluster the input vectors.
*
* @param input
* the directory pathname for input points
@@ -126,48 +129,33 @@ public class KMeansDriver extends Abstra
* @param runClustering
* true if points are to be clustered after iterations are completed
* @param clusterClassificationThreshold
- * Is a clustering strictness / outlier removal parameter. Its value
- * should be between 0 and 1. Vectors having pdf below this value
- * will not be clustered.
+ * Is a clustering strictness / outlier removal parameter. Its value should be between 0 and 1. Vectors
+ * having pdf below this value will not be clustered.
* @param runSequential
* if true execute sequential algorithm
*/
- public static void run(Configuration conf,
- Path input,
- Path clustersIn,
- Path output,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations,
- boolean runClustering,
- double clusterClassificationThreshold,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException {
-
+ public static void run(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure,
+ double convergenceDelta, int maxIterations, boolean runClustering, double clusterClassificationThreshold,
+ boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+
// iterate until the clusters converge
String delta = Double.toString(convergenceDelta);
if (log.isInfoEnabled()) {
- log.info("Input: {} Clusters In: {} Out: {} Distance: {}",
- new Object[] {input, clustersIn, output,measure.getClass().getName()});
- log.info("convergence: {} max Iterations: {} num Reduce Tasks: {} Input Vectors: {}",
- new Object[] {convergenceDelta, maxIterations, VectorWritable.class.getName()});
+ log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] {input, clustersIn, output,
+ measure.getClass().getName()});
+ log.info("convergence: {} max Iterations: {} num Reduce Tasks: {} Input Vectors: {}", new Object[] {
+ convergenceDelta, maxIterations, VectorWritable.class.getName()});
}
Path clustersOut = buildClusters(conf, input, clustersIn, output, measure, maxIterations, delta, runSequential);
if (runClustering) {
log.info("Clustering data");
- clusterData(conf,
- input,
- clustersOut,
- output,
- measure,
- clusterClassificationThreshold,
- runSequential);
+ clusterData(conf, input, clustersOut, output, measure, clusterClassificationThreshold, runSequential);
}
}
-
+
/**
- * Iterate over the input vectors to produce clusters and, if requested, use
- * the results of the final iteration to cluster the input vectors.
+ * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+ * cluster the input vectors.
*
* @param input
* the directory pathname for input points
@@ -184,37 +172,20 @@ public class KMeansDriver extends Abstra
* @param runClustering
* true if points are to be clustered after iterations are completed
* @param clusterClassificationThreshold
- * Is a clustering strictness / outlier removal parrameter. Its value
- * should be between 0 and 1. Vectors having pdf below this value
- * will not be clustered.
+ * Is a clustering strictness / outlier removal parrameter. Its value should be between 0 and 1. Vectors
+ * having pdf below this value will not be clustered.
* @param runSequential
* if true execute sequential algorithm
*/
- public static void run(Path input,
- Path clustersIn,
- Path output,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations,
- boolean runClustering,
- double clusterClassificationThreshold,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException {
- run(new Configuration(),
- input,
- clustersIn,
- output,
- measure,
- convergenceDelta,
- maxIterations,
- runClustering,
- clusterClassificationThreshold,
- runSequential);
+ public static void run(Path input, Path clustersIn, Path output, DistanceMeasure measure, double convergenceDelta,
+ int maxIterations, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ run(new Configuration(), input, clustersIn, output, measure, convergenceDelta, maxIterations, runClustering,
+ clusterClassificationThreshold, runSequential);
}
-
+
/**
- * Iterate over the input vectors to produce cluster directories for each
- * iteration
+ * Iterate over the input vectors to produce cluster directories for each iteration
*
* @param conf
* the Configuration to use
@@ -235,9 +206,8 @@ public class KMeansDriver extends Abstra
*
* @return the Path of the final clusters directory
*/
- public static Path buildClusters(Configuration conf, Path input,
- Path clustersIn, Path output, DistanceMeasure measure, int maxIterations,
- String delta, boolean runSequential) throws IOException,
+ public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output,
+ DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException,
InterruptedException, ClassNotFoundException {
double convergenceDelta = Double.parseDouble(delta);
@@ -245,32 +215,22 @@ public class KMeansDriver extends Abstra
KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
if (clusters.isEmpty()) {
- throw new IllegalStateException("Clusters is empty!");
- }
-
- Path priorClustersPath = new Path(clustersIn, "clusters-0");
-
- FileSystem fileSystem = clustersIn.getFileSystem(conf);
- if(fileSystem.isFile(clustersIn)){
- priorClustersPath = new Path(clustersIn.getParent(), "prior");
- fileSystem.mkdirs(priorClustersPath);
+ throw new IllegalStateException("No input clusters found. Check your -c argument.");
}
- KMeansClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
-
+ Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
+ ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
ClusterClassifier prior = new ClusterClassifier(clusters, policy);
prior.writeToSeqFiles(priorClustersPath);
if (runSequential) {
- new ClusterIterator().iterateSeq(input, priorClustersPath, output,
- maxIterations);
+ new ClusterIterator().iterateSeq(conf, input, priorClustersPath, output, maxIterations);
} else {
- new ClusterIterator().iterateMR(input, priorClustersPath, output,
- maxIterations);
+ new ClusterIterator().iterateMR(conf, input, priorClustersPath, output, maxIterations);
}
return output;
}
-
+
/**
* Run the job using supplied arguments
*
@@ -283,21 +243,15 @@ public class KMeansDriver extends Abstra
* @param measure
* the classname of the DistanceMeasure
* @param clusterClassificationThreshold
- * Is a clustering strictness / outlier removal parrameter. Its value
- * should be between 0 and 1. Vectors having pdf below this value
- * will not be clustered.
+ * Is a clustering strictness / outlier removal parrameter. Its value should be between 0 and 1. Vectors
+ * having pdf below this value will not be clustered.
* @param runSequential
* if true execute sequential algorithm
*/
- public static void clusterData(Configuration conf,
- Path input,
- Path clustersIn,
- Path output,
- DistanceMeasure measure,
- double clusterClassificationThreshold,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException {
-
+ public static void clusterData(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure,
+ double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException,
+ ClassNotFoundException {
+
if (log.isInfoEnabled()) {
log.info("Running Clustering");
log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] {input, clustersIn, output, measure});
@@ -306,5 +260,5 @@ public class KMeansDriver extends Abstra
ClusterClassificationDriver.run(input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY),
clusterClassificationThreshold, true, runSequential);
}
-
+
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java Wed May 9 22:02:50 2012
@@ -147,6 +147,8 @@ public class MahalanobisDistanceMeasure
if (v1.size() != v2.size()) {
throw new CardinalityException(v1.size(), v2.size());
}
+ if (inverseCovarianceMatrix== null)
+ System.out.println();
return Math.sqrt(v1.minus(v2).dot(Algebra.mult(inverseCovarianceMatrix, v1.minus(v2))));
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java Wed May 9 22:02:50 2012
@@ -22,11 +22,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.ClusteringTestUtils;
import org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
-import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
@@ -80,74 +78,6 @@ public final class TestDirichletClusteri
generateSamples(num, mx, my, sd, 2);
}
- private static void printResults(Iterable<Cluster[]> result, int significant) {
- int row = 0;
- for (Cluster[] r : result) {
- System.out.print("sample[" + row++ + "]= ");
- for (Cluster model : r) {
- if (model.getNumObservations() > significant) {
- System.out.print(model.asFormatString(null) + ", ");
- }
- }
- System.out.println();
- }
- System.out.println();
- }
-
- @Test
- public void testDirichletCluster100() {
- System.out.println("testDirichletCluster100");
- generateSamples(40, 1, 1, 3);
- generateSamples(30, 1, 0, 0.1);
- generateSamples(30, 0, 1, 0.1);
-
- DirichletClusterer dc = new DirichletClusterer(sampleData,
- new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
- 1.0,
- 10,
- 1,
- 0);
- List<Cluster[]> result = dc.cluster(30);
- printResults(result, 2);
- assertNotNull(result);
- }
-
- @Test
- public void testDirichletGaussianCluster100() {
- System.out.println("testDirichletGaussianCluster100");
- generateSamples(40, 1, 1, 3);
- generateSamples(30, 1, 0, 0.1);
- generateSamples(30, 0, 1, 0.1);
-
- DirichletClusterer dc = new DirichletClusterer(sampleData,
- new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
- 1.0,
- 10,
- 1,
- 0);
- List<Cluster[]> result = dc.cluster(30);
- printResults(result, 2);
- assertNotNull(result);
- }
-
- @Test
- public void testDirichletDMCluster100() {
- System.out.println("testDirichletDMCluster100");
- generateSamples(40, 1, 1, 3);
- generateSamples(30, 1, 0, 0.1);
- generateSamples(30, 0, 1, 0.1);
-
- DirichletClusterer dc = new DirichletClusterer(sampleData,
- new DistanceMeasureClusterDistribution(new VectorWritable(new DenseVector(2))),
- 1.0,
- 10,
- 1,
- 0);
- List<Cluster[]> result = dc.cluster(30);
- printResults(result, 2);
- assertNotNull(result);
- }
-
@Test
public void testDirichletClusteringSeq() throws Exception {
Path output = getTestTempDirPath("output");
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java Wed May 9 22:02:50 2012
@@ -18,29 +18,21 @@ package org.apache.mahout.clustering.dir
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.ClusteringTestUtils;
-import org.apache.mahout.clustering.Model;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
import org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.DummyRecordWriter;
import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.MahalanobisDistanceMeasure;
@@ -53,6 +45,9 @@ import org.apache.mahout.math.VectorWrit
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
public final class TestMapReduce extends MahoutTestCase {
private Collection<VectorWritable> sampleData = Lists.newArrayList();
@@ -89,8 +84,7 @@ public final class TestMapReduce extends
}
/**
- * Generate random samples with asymmetric standard deviations and add them to
- * the sampleData
+ * Generate random samples with asymmetric standard deviations and add them to the sampleData
*
* @param num
* int number of samples to generate
@@ -118,127 +112,6 @@ public final class TestMapReduce extends
fs = FileSystem.get(conf);
}
- /** Test the basic Mapper */
- @Test
- public void testMapper() throws Exception {
- generateSamples(10, 0, 0, 1);
- DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
- 5, 1);
- DirichletMapper mapper = new DirichletMapper();
- mapper.setup(state);
-
- RecordWriter<Text,VectorWritable> writer = new DummyRecordWriter<Text,VectorWritable>();
- Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context context = DummyRecordWriter.build(mapper,
- conf, writer);
- for (VectorWritable v : sampleData) {
- mapper.map(null, v, context);
- }
- // Map<String, List<VectorWritable>> data = collector.getData();
- // this seed happens to produce two partitions, but they work
- // assertEquals("output size", 3, data.size());
- }
-
- /** Test the basic Reducer */
- @Test
- public void testReducer() throws Exception {
- generateSamples(100, 0, 0, 1);
- generateSamples(100, 2, 0, 1);
- generateSamples(100, 0, 2, 1);
- generateSamples(100, 2, 2, 1);
- DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
- 20, 1);
- DirichletMapper mapper = new DirichletMapper();
- mapper.setup(state);
-
- DummyRecordWriter<Text,VectorWritable> mapWriter = new DummyRecordWriter<Text,VectorWritable>();
- Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context mapContext = DummyRecordWriter.build(
- mapper, conf, mapWriter);
- for (VectorWritable v : sampleData) {
- mapper.map(null, v, mapContext);
- }
-
- DirichletReducer reducer = new DirichletReducer();
- reducer.setup(state);
- RecordWriter<Text,ClusterWritable> reduceWriter = new DummyRecordWriter<Text,ClusterWritable>();
- Reducer<Text,VectorWritable,Text,ClusterWritable>.Context reduceContext = DummyRecordWriter.build(reducer, conf,
- reduceWriter, Text.class, VectorWritable.class);
- for (Text key : mapWriter.getKeys()) {
- reducer.reduce(new Text(key), mapWriter.getValue(key), reduceContext);
- }
-
- Cluster[] newModels = reducer.getNewModels();
- state.update(newModels);
- }
-
- /** Test the Mapper and Reducer in an iteration loop */
- @Test
- public void testMRIterations() throws Exception {
- generateSamples(100, 0, 0, 1);
- generateSamples(100, 2, 0, 1);
- generateSamples(100, 0, 2, 1);
- generateSamples(100, 2, 2, 1);
- DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
- 20, 1.0);
-
- Collection<Model<VectorWritable>[]> models = Lists.newArrayList();
-
- for (int iteration = 0; iteration < 10; iteration++) {
- DirichletMapper mapper = new DirichletMapper();
- mapper.setup(state);
- DummyRecordWriter<Text,VectorWritable> mapWriter = new DummyRecordWriter<Text,VectorWritable>();
- Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context mapContext = DummyRecordWriter.build(
- mapper, conf, mapWriter);
- for (VectorWritable v : sampleData) {
- mapper.map(null, v, mapContext);
- }
-
- DirichletReducer reducer = new DirichletReducer();
- reducer.setup(state);
- RecordWriter<Text,ClusterWritable> reduceWriter = new DummyRecordWriter<Text,ClusterWritable>();
- Reducer<Text,VectorWritable,Text,ClusterWritable>.Context reduceContext = DummyRecordWriter.build(reducer, conf,
- reduceWriter, Text.class, VectorWritable.class);
- for (Text key : mapWriter.getKeys()) {
- reducer.reduce(new Text(key), mapWriter.getValue(key), reduceContext);
- }
-
- Cluster[] newModels = reducer.getNewModels();
- state.update(newModels);
- models.add(newModels);
- }
- printModels(models, 0);
- }
-
- private static void printModels(Iterable<Model<VectorWritable>[]> results, int significant) {
- int row = 0;
- for (Model<VectorWritable>[] r : results) {
- System.out.print("sample[" + row++ + "]= ");
- for (int k = 0; k < r.length; k++) {
- Model<VectorWritable> model = r[k];
- if (model.getNumObservations() > significant) {
- System.out.print("m" + k + model.toString() + ", ");
- }
- }
- System.out.println();
- }
- System.out.println();
- }
-
- private static void printResults(Iterable<List<DirichletCluster>> clusters, int significant) {
- int row = 0;
- for (List<DirichletCluster> r : clusters) {
- System.out.print("sample[" + row++ + "]= ");
- for (int k = 0; k < r.size(); k++) {
- Model<VectorWritable> model = r.get(k).getModel();
- if (model.getNumObservations() > significant) {
- int total = (int) r.get(k).getTotalObservations();
- System.out.print("m" + k + '(' + total + ')' + model.toString() + ", ");
- }
- }
- System.out.println();
- }
- System.out.println();
- }
-
/** Test the Mapper and Reducer using the Driver in sequential execution mode */
@Test
public void testDriverIterationsSeq() throws Exception {
@@ -252,28 +125,18 @@ public final class TestMapReduce extends
Integer maxIterations = 5;
DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
DenseVector.class.getName(), null, 2);
+ Path outputPath = getTestTempDirPath("output");
String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
- optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
optKey(DefaultOptionCreator.NUM_CLUSTERS_OPTION), "20", optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
maxIterations.toString(), optKey(DirichletDriver.ALPHA_OPTION), "1.0",
optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.CLUSTERING_OPTION),
optKey(DefaultOptionCreator.METHOD_OPTION), DefaultOptionCreator.SEQUENTIAL_METHOD};
- DirichletDriver dirichletDriver = new DirichletDriver();
- dirichletDriver.setConf(conf);
- dirichletDriver.run(args);
+ ToolRunner.run(conf, new DirichletDriver(), args);
// and inspect results
- Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
- Configuration conf = new Configuration();
- conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
- conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
- for (int i = 0; i <= maxIterations; i++) {
- conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
- clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
- }
- printResults(clusters, 0);
+ printModels(getClusters(outputPath, maxIterations));
}
/** Test the Mapper and Reducer using the Driver in mapreduce mode */
@@ -289,53 +152,21 @@ public final class TestMapReduce extends
Integer maxIterations = 5;
DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
DenseVector.class.getName(), null, 2);
+ Path outputPath = getTestTempDirPath("output");
String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
- optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
optKey(DefaultOptionCreator.NUM_CLUSTERS_OPTION), "20", optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
maxIterations.toString(), optKey(DirichletDriver.ALPHA_OPTION), "1.0",
optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.CLUSTERING_OPTION)};
- ToolRunner.run(new Configuration(), new DirichletDriver(), args);
+ ToolRunner.run(conf, new DirichletDriver(), args);
// and inspect results
- Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
- Configuration conf = new Configuration();
- conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
- conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
- for (int i = 0; i <= maxIterations; i++) {
- conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
- clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
- }
- printResults(clusters, 0);
- }
-
- /** Test the Mapper and Reducer using the Driver */
- @Test
- public void testDriverMnRIterations() throws Exception {
- generate4Datasets();
- // Now run the driver
- int maxIterations = 3;
- DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
- DenseVector.class.getName(), null, 2);
- Configuration conf = new Configuration();
- DirichletDriver.run(conf, getTestTempDirPath("input"), getTestTempDirPath("output"), description, 20,
- maxIterations, 1.0, false, true, 0, false);
- // and inspect results
- Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
- conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
- conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
- for (int i = 0; i <= maxIterations; i++) {
- conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
- clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
- }
- printResults(clusters, 0);
+ printModels(getClusters(outputPath, maxIterations));
}
/**
- * Test the Driver in sequential execution mode using
- * MahalanobisDistanceMeasure
+ * Test the Driver in sequential execution mode using MahalanobisDistanceMeasure
*/
@Test
public void testDriverIterationsMahalanobisSeq() throws Exception {
@@ -381,8 +212,9 @@ public final class TestMapReduce extends
conf.set("MahalanobisDistanceMeasure.vectorClass", VectorWritable.class.getName());
Integer maxIterations = 5;
+ Path outputPath = getTestTempDirPath("output");
String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
- optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), description.getDistanceMeasure(),
optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
@@ -394,16 +226,7 @@ public final class TestMapReduce extends
dirichletDriver.setConf(conf);
dirichletDriver.run(args);
// and inspect results
- Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
- Configuration conf = new Configuration();
- conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
- conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
- for (int i = 0; i <= maxIterations; i++) {
- conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
- clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
- }
- printResults(clusters, 0);
+ printModels(getClusters(outputPath, maxIterations));
}
/** Test the Mapper and Reducer using the Driver in mapreduce mode */
@@ -411,7 +234,7 @@ public final class TestMapReduce extends
public void testDriverIterationsMahalanobisMR() throws Exception {
generateAsymmetricSamples(100, 0, 0, 0.5, 3.0);
generateAsymmetricSamples(100, 0, 3, 0.3, 4.0);
- ClusteringTestUtils.writePointsToFile(sampleData,true, getTestTempFilePath("input/data.txt"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(sampleData, true, getTestTempFilePath("input/data.txt"), fs, conf);
// Now run the driver using the run() method. Others can use runJob() as
// before
@@ -452,8 +275,9 @@ public final class TestMapReduce extends
conf.set("MahalanobisDistanceMeasure.vectorClass", VectorWritable.class.getName());
Integer maxIterations = 5;
+ Path outputPath = getTestTempDirPath("output");
String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
- optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), description.getDistanceMeasure(),
optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
@@ -464,30 +288,37 @@ public final class TestMapReduce extends
dirichletDriver.setConf(conf);
ToolRunner.run(conf, dirichletDriver, args);
// and inspect results
- Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
- Configuration conf = new Configuration();
- conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
- conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
- conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
- for (int i = 0; i <= maxIterations; i++) {
- conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
- clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
+ printModels(getClusters(outputPath, maxIterations));
+ }
+
+ private void printModels(List<List<Cluster>> result) {
+ int row = 0;
+ StringBuilder models = new StringBuilder(100);
+ for (List<Cluster> r : result) {
+ models.append("sample[").append(row++).append("]= ");
+ for (int k = 0; k < r.size(); k++) {
+ Cluster model = r.get(k);
+ models.append('m').append(k).append(model.asFormatString(null)).append(", ");
+ }
+ models.append('\n');
}
- printResults(clusters, 0);
+ models.append('\n');
+ System.out.println(models.toString());
}
- private void generate4Datasets() throws IOException {
- generateSamples(500, 0, 0, 0.5);
- ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data1.txt"), fs, conf);
- sampleData = Lists.newArrayList();
- generateSamples(500, 2, 0, 0.2);
- ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data2.txt"), fs, conf);
- sampleData = Lists.newArrayList();
- generateSamples(500, 0, 2, 0.3);
- ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data3.txt"), fs, conf);
- sampleData = Lists.newArrayList();
- generateSamples(500, 2, 2, 1);
- ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data4.txt"), fs, conf);
+ private List<List<Cluster>> getClusters(Path output, int numIterations) throws IOException {
+ List<List<Cluster>> result = new ArrayList<List<Cluster>>();
+ for (int i = 1; i <= numIterations; i++) {
+ ClusterClassifier posterior = new ClusterClassifier();
+ String name = i == numIterations ? "clusters-" + i + "-final" : "clusters-" + i;
+ posterior.readFromSeqFiles(conf, new Path(output, name));
+ List<Cluster> clusters = Lists.newArrayList();
+ for (Cluster cluster : posterior.getModels()) {
+ clusters.add(cluster);
+ }
+ result.add(clusters);
+ }
+ return result;
}
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java Wed May 9 22:02:50 2012
@@ -91,7 +91,7 @@ public final class TestClusterClassifier
Path path = new Path(getTestTempDirPath(), "output");
classifier.writeToSeqFiles(path);
ClusterClassifier newClassifier = new ClusterClassifier();
- newClassifier.readFromSeqFiles(path);
+ newClassifier.readFromSeqFiles(new Configuration(), path);
return newClassifier;
}
@@ -233,13 +233,13 @@ public final class TestClusterClassifier
for (Cluster cluster : prior.getModels()) {
System.out.println(cluster.asFormatString(null));
}
- new ClusterIterator().iterateSeq(pointsPath, path, outPath, 5);
+ new ClusterIterator().iterateSeq(conf, pointsPath, path, outPath, 5);
for (int i = 1; i <= 4; i++) {
System.out.println("Classifier-" + i);
ClusterClassifier posterior = new ClusterClassifier();
String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
- posterior.readFromSeqFiles(new Path(outPath, name));
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
assertEquals(3, posterior.getModels().size());
for (Cluster cluster : posterior.getModels()) {
System.out.println(cluster.asFormatString(null));
@@ -267,13 +267,13 @@ public final class TestClusterClassifier
for (Cluster cluster : prior.getModels()) {
System.out.println(cluster.asFormatString(null));
}
- new ClusterIterator().iterateMR(pointsPath, path, outPath, 5);
+ new ClusterIterator().iterateMR(conf, pointsPath, path, outPath, 5);
for (int i = 1; i <= 4; i++) {
System.out.println("Classifier-" + i);
ClusterClassifier posterior = new ClusterClassifier();
String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
- posterior.readFromSeqFiles(new Path(outPath, name));
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
assertEquals(3, posterior.getModels().size());
for (Cluster cluster : posterior.getModels()) {
System.out.println(cluster.asFormatString(null));
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java Wed May 9 22:02:50 2012
@@ -17,9 +17,14 @@
package org.apache.mahout.clustering.display;
+import java.awt.BasicStroke;
+import java.awt.Color;
+import java.awt.Graphics;
+import java.awt.Graphics2D;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.common.HadoopUtil;
@@ -27,12 +32,6 @@ import org.apache.mahout.common.RandomUt
import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
import org.apache.mahout.math.DenseVector;
-import java.awt.BasicStroke;
-import java.awt.Color;
-import java.awt.Graphics;
-import java.awt.Graphics2D;
-import java.util.List;
-
/**
* Java desktop graphics class that runs canopy clustering and displays the results.
* This class generates random data and clusters it.
@@ -79,29 +78,8 @@ public class DisplayCanopy extends Displ
RandomUtils.useTestSeed();
generateSamples();
writeSampleData(samples);
- //boolean b = true;
- //if (b) {
CanopyDriver.buildClusters(conf, samples, output, new ManhattanDistanceMeasure(), T1, T2, 0, true);
- loadClusters(output, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- String pathString = path.toString();
- return pathString.contains("/clusters-");
- }
- });
- //} else {
- // List<Vector> points = new ArrayList<Vector>();
- // for (VectorWritable sample : SAMPLE_DATA) {
- // points.add(sample.get());
- // }
- // List<Canopy> canopies = CanopyClusterer.createCanopies(points, new ManhattanDistanceMeasure(), T1, T2);
- // CanopyClusterer.updateCentroids(canopies);
- // List<Cluster> clusters = new ArrayList<Cluster>();
- // for (Canopy canopy : canopies) {
- // clusters.add(canopy);
- // }
- // CLUSTERS.add(clusters);
- //}
+ loadClustersWritable(output);
new DisplayCanopy();
}