You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/31 19:30:10 UTC
svn commit: r1307859 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/classify/
core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/
core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/
integration/src/test/java/org/a...
Author: pranjan
Date: Sat Mar 31 17:30:10 2012
New Revision: 1307859
URL: http://svn.apache.org/viewvc?rev=1307859&view=rev
Log:
MAHOUT-984. Refactored clustering out of FuzzyKMeansDriver using ClusterClassificationDriver.
All junit tests pass.
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Sat Mar 31 17:30:10 2012
@@ -141,7 +141,7 @@ public class ClusterClassificationDriver
private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output,
Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
List<Cluster> clusterModels = populateClusterModels(clusters, conf);
- ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(clusters));
+ ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(conf, clusters));
ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy);
selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely);
@@ -160,7 +160,7 @@ public class ClusterClassificationDriver
private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
List<Cluster> clusterModels = new ArrayList<Cluster>();
Cluster cluster = null;
- Path finalClustersPath = finalClustersPath(clusterOutputPath);
+ Path finalClustersPath = finalClustersPath(conf, clusterOutputPath);
Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
PathFilters.partFilter(), null, false, conf);
while (it.hasNext()) {
@@ -174,8 +174,8 @@ public class ClusterClassificationDriver
return clusterModels;
}
- private static Path finalClustersPath(Path clusterOutputPath) throws IOException {
- FileSystem fileSystem = clusterOutputPath.getFileSystem(new Configuration());
+ private static Path finalClustersPath(Configuration conf, Path clusterOutputPath) throws IOException {
+ FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
Path finalClustersPath = clusterFiles[0].getPath();
return finalClustersPath;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Sat Mar 31 17:30:10 2012
@@ -17,18 +17,17 @@
package org.apache.mahout.clustering.fuzzykmeans;
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -40,8 +39,10 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.mahout.clustering.AbstractCluster;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.ClusterObservations;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
@@ -52,12 +53,14 @@ import org.apache.mahout.common.distance
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
public class FuzzyKMeansDriver extends AbstractJob {
public static final String M_OPTION = "m";
@@ -187,7 +190,7 @@ public class FuzzyKMeansDriver extends A
log.info("Clustering ");
clusterData(input,
clustersOut,
- new Path(output, Cluster.CLUSTERED_POINTS_DIR),
+ output,
measure,
convergenceDelta,
m,
@@ -299,7 +302,7 @@ public class FuzzyKMeansDriver extends A
log.info("Clustering");
clusterData(input,
clustersOut,
- new Path(output, Cluster.CLUSTERED_POINTS_DIR),
+ output,
measure,
convergenceDelta,
m,
@@ -467,82 +470,10 @@ public class FuzzyKMeansDriver extends A
double threshold,
boolean runSequential)
throws IOException, ClassNotFoundException, InterruptedException {
- if (runSequential) {
- clusterDataSeq(input, clustersIn, output, measure, convergenceDelta, m);
- } else {
- clusterDataMR(input, clustersIn, output, measure, convergenceDelta, m, emitMostLikely, threshold);
- }
- }
-
- private static void clusterDataSeq(Path input,
- Path clustersIn,
- Path output,
- DistanceMeasure measure,
- double convergenceDelta,
- float m) throws IOException {
- FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, convergenceDelta, m);
- List<SoftCluster> clusters = Lists.newArrayList();
- FuzzyKMeansUtil.configureWithClusterInfo(clustersIn, clusters);
- if (clusters.isEmpty()) {
- throw new IllegalStateException("Clusters is empty!");
- }
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(input.toUri(), conf);
- FileStatus[] status = fs.listStatus(input, PathFilters.logsCRCFilter());
- int part = 0;
- for (FileStatus s : status) {
- SequenceFile.Writer writer = new SequenceFile.Writer(fs,
- conf,
- new Path(output, "part-m-" + part),
- IntWritable.class,
- WeightedVectorWritable.class);
- try {
- for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(s.getPath(), conf)) {
- clusterer.emitPointToClusters(value, clusters, writer);
- }
- } finally {
- Closeables.closeQuietly(writer);
- }
- }
-
- }
-
- private static void clusterDataMR(Path input,
- Path clustersIn,
- Path output,
- DistanceMeasure measure,
- double convergenceDelta,
- float m,
- boolean emitMostLikely,
- double threshold) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
- conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
- conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
- conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
- conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
- conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
- conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(threshold));
-
- // Clear output
- output.getFileSystem(conf).delete(output, true);
-
- Job job = new Job(conf, "FuzzyKMeans Driver running clusterData over input: " + input);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(WeightedVectorWritable.class);
-
- FileInputFormat.setInputPaths(job, input);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setMapperClass(FuzzyKMeansClusterMapper.class);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setNumReduceTasks(0);
- job.setJarByClass(FuzzyKMeansDriver.class);
-
- if (!job.waitForCompletion(true)) {
- throw new InterruptedException("Fuzzy K-Means Clustering failed processing " + clustersIn);
- }
+
+ ClusterClassifier.writePolicy(new FuzzyKMeansClusteringPolicy(m, convergenceDelta), clustersIn);
+ ClusterClassificationDriver.run(input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold, true,
+ runSequential);
}
/**
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Sat Mar 31 17:30:10 2012
@@ -21,15 +21,11 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@@ -54,6 +50,10 @@ import org.apache.mahout.math.VectorWrit
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
public final class TestFuzzyKmeansClustering extends MahoutTestCase {
private FileSystem fs;
@@ -531,129 +531,6 @@ public final class TestFuzzyKmeansCluste
}
@Test
- public void testFuzzyKMeansClusterMapper() throws Exception {
- List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
-
- for (int k = 0; k < points.size(); k++) {
- System.out.println("testKFuzzyKMeansMRJob k= " + k);
- // pick k initial cluster centers at random
- Collection<SoftCluster> clusterList = Lists.newArrayList();
-
- for (int i = 0; i < k + 1; i++) {
- Vector vec = tweakValue(points.get(i).get());
-
- SoftCluster cluster = new SoftCluster(vec, i, measure);
- cluster.observe(cluster.getCenter(), 1);
- clusterList.add(cluster);
- }
- for (SoftCluster softCluster : clusterList) {
- softCluster.computeParameters();
- }
-
- // run mapper
- FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
- mapper.config(clusterList);
- DistanceMeasure measure = new EuclideanDistanceMeasure();
-
- Configuration conf = new Configuration();
- conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
- conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
- conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
- conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, "true");
- conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, "0");
-
- DummyRecordWriter<Text, ClusterObservations> mapWriter = new DummyRecordWriter<Text, ClusterObservations>();
- Mapper<WritableComparable<?>, VectorWritable, Text, ClusterObservations>.Context mapContext =
- DummyRecordWriter.build(mapper, conf, mapWriter);
- mapper.setup(mapContext);
- for (VectorWritable point : points) {
- mapper.map(new Text(), point, mapContext);
- }
-
- // run combiner
- FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
- DummyRecordWriter<Text, ClusterObservations> combinerWriter = new DummyRecordWriter<Text, ClusterObservations>();
- Reducer<Text, ClusterObservations, Text, ClusterObservations>.Context combinerContext =
- DummyRecordWriter.build(combiner, conf, combinerWriter, Text.class, ClusterObservations.class);
- combiner.setup(combinerContext);
- for (Text key : mapWriter.getKeys()) {
- List<ClusterObservations> values = mapWriter.getValue(key);
- combiner.reduce(new Text(key), values, combinerContext);
- }
-
- // run reducer
- FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
- DummyRecordWriter<Text, ClusterWritable> reducerWriter = new DummyRecordWriter<Text, ClusterWritable>();
- Reducer<Text, ClusterObservations, Text, ClusterWritable>.Context reducerContext =
- DummyRecordWriter.build(reducer, conf, reducerWriter, Text.class, ClusterObservations.class);
- reducer.setup(clusterList, conf);
-
- for (Text key : combinerWriter.getKeys()) {
- List<ClusterObservations> values = combinerWriter.getValue(key);
- reducer.reduce(new Text(key), values, reducerContext);
- }
-
- // run clusterMapper
- Collection<SoftCluster> reducerClusters = Lists.newArrayList();
- for (Text key : reducerWriter.getKeys()) {
- List<ClusterWritable> values = reducerWriter.getValue(key);
- reducerClusters.add((SoftCluster) values.get(0).getValue());
- }
- for (SoftCluster softCluster : reducerClusters) {
- softCluster.computeParameters();
- }
-
- FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
- DummyRecordWriter<IntWritable, WeightedVectorWritable> clusterWriter =
- new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
- Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context clusterContext =
- DummyRecordWriter.build(clusterMapper, conf, clusterWriter);
- clusterMapper.setup(reducerClusters, conf);
-
- for (VectorWritable point : points) {
- clusterMapper.map(new Text(), point, clusterContext);
- }
-
- // compute the reference result after one iteration and compare
- List<SoftCluster> reference = Lists.newArrayList();
- for (int i = 0; i < k + 1; i++) {
- Vector vec = tweakValue(points.get(i).get());
- reference.add(new SoftCluster(vec, i, measure));
- }
- Map<Integer, List<WeightedVectorWritable>> refClusters = Maps.newHashMap();
- Collection<Vector> pointsVectors = Lists.newArrayList();
- for (VectorWritable point : points) {
- pointsVectors.add(point.get());
- }
-
- List<List<SoftCluster>> clusters = FuzzyKMeansClusterer.clusterPoints(pointsVectors,
- reference,
- new EuclideanDistanceMeasure(),
- 0.001,
- 2,
- 1);
-
- computeCluster(pointsVectors, clusters.get(clusters.size() - 1),
- new FuzzyKMeansClusterer(new EuclideanDistanceMeasure(), 0.001, 2), refClusters);
-
- // Now compare the clustermapper results with reference implementation
- assertEquals("mapper and reference sizes", refClusters.size(), clusterWriter.getKeys().size());
- for (Map.Entry<Integer, List<WeightedVectorWritable>> entry : refClusters.entrySet()) {
- int key = entry.getKey();
- List<WeightedVectorWritable> value = entry.getValue();
- System.out.println("refClusters=" + value + " mapClusters=" + clusterWriter.getValue(new IntWritable(key)));
- assertEquals("cluster " + key + " sizes", value.size(), clusterWriter.getValue(new IntWritable(key)).size());
- }
- // make sure all points are allocated to a cluster
- int size = 0;
- for (List<WeightedVectorWritable> pts : refClusters.values()) {
- size += pts.size();
- }
- assertEquals("total size", size, points.size());
- }
- }
-
- @Test
public void testClusterObservationsSerialization() throws Exception {
double[] data = { 1.1, 2.2, 3.3 };
Vector vector = new DenseVector(data);
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java Sat Mar 31 17:30:10 2012
@@ -366,13 +366,14 @@ public final class TestClusterEvaluator
// now run the Canopy job to prime kMeans canopies
Configuration conf = new Configuration();
CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, true);
- // now run the KMeans job
- FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
+ Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+ // now run the KMeans job
+ FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput ,
measure, 0.001, 10, 2, true, true, 0, true);
int numIterations = 10;
Path clustersIn = new Path(output, "clusters-4");
RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
- "clusteredPoints"), output, measure, numIterations, true);
+ "clusteredPoints"), fuzzyKMeansOutput, measure, numIterations, true);
ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
// now print out the Results
System.out.println("Intra-cluster density = "
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java Sat Mar 31 17:30:10 2012
@@ -378,13 +378,14 @@ public final class TestCDbwEvaluator ext
// now run the Canopy job to prime kMeans canopies
CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
false, 0.0, true);
- // now run the KMeans job
- FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
+ Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+ // now run the KMeans job
+ FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput ,
measure, 0.001, 10, 2, true, true, 0, true);
int numIterations = 10;
Path clustersIn = new Path(output, "clusters-4");
RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
- "clusteredPoints"), output, measure, numIterations, true);
+ "clusteredPoints"), fuzzyKMeansOutput, measure, numIterations, true);
CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
// printRepPoints(numIterations);
// now print out the Results