You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/31 19:30:10 UTC

svn commit: r1307859 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/classify/ core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/ integration/src/test/java/org/a...

Author: pranjan
Date: Sat Mar 31 17:30:10 2012
New Revision: 1307859

URL: http://svn.apache.org/viewvc?rev=1307859&view=rev
Log:
MAHOUT-984. Refactored clustering out of FuzzyKMeansDriver using ClusterClassificationDriver.
All junit tests pass.

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Sat Mar 31 17:30:10 2012
@@ -141,7 +141,7 @@ public class ClusterClassificationDriver
   private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output,
       Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
     List<Cluster> clusterModels = populateClusterModels(clusters, conf);
-    ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(clusters));
+    ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(conf, clusters));
     ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy);
     selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely);
     
@@ -160,7 +160,7 @@ public class ClusterClassificationDriver
   private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
     List<Cluster> clusterModels = new ArrayList<Cluster>();
     Cluster cluster = null;
-    Path finalClustersPath = finalClustersPath(clusterOutputPath);
+    Path finalClustersPath = finalClustersPath(conf, clusterOutputPath);
     Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
         PathFilters.partFilter(), null, false, conf);
     while (it.hasNext()) {
@@ -174,8 +174,8 @@ public class ClusterClassificationDriver
     return clusterModels;
   }
   
-  private static Path finalClustersPath(Path clusterOutputPath) throws IOException {
-    FileSystem fileSystem = clusterOutputPath.getFileSystem(new Configuration());
+  private static Path finalClustersPath(Configuration conf, Path clusterOutputPath) throws IOException {
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
     FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
     Path finalClustersPath = clusterFiles[0].getPath();
     return finalClustersPath;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Sat Mar 31 17:30:10 2012
@@ -17,18 +17,17 @@
 
 package org.apache.mahout.clustering.fuzzykmeans;
 
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -40,8 +39,10 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.mahout.clustering.AbstractCluster;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.ClusterObservations;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
@@ -52,12 +53,14 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
 import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
 public class FuzzyKMeansDriver extends AbstractJob {
 
   public static final String M_OPTION = "m";
@@ -187,7 +190,7 @@ public class FuzzyKMeansDriver extends A
       log.info("Clustering ");
       clusterData(input,
                   clustersOut,
-                  new Path(output, Cluster.CLUSTERED_POINTS_DIR),
+                  output,
                   measure,
                   convergenceDelta,
                   m,
@@ -299,7 +302,7 @@ public class FuzzyKMeansDriver extends A
       log.info("Clustering");
       clusterData(input,
                   clustersOut,
-                  new Path(output, Cluster.CLUSTERED_POINTS_DIR),
+                  output,
                   measure,
                   convergenceDelta,
                   m,
@@ -467,82 +470,10 @@ public class FuzzyKMeansDriver extends A
                                  double threshold,
                                  boolean runSequential)
     throws IOException, ClassNotFoundException, InterruptedException {
-    if (runSequential) {
-      clusterDataSeq(input, clustersIn, output, measure, convergenceDelta, m);
-    } else {
-      clusterDataMR(input, clustersIn, output, measure, convergenceDelta, m, emitMostLikely, threshold);
-    }
-  }
-
-  private static void clusterDataSeq(Path input,
-                                     Path clustersIn,
-                                     Path output,
-                                     DistanceMeasure measure,
-                                     double convergenceDelta,
-                                     float m) throws IOException {
-    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, convergenceDelta, m);
-    List<SoftCluster> clusters = Lists.newArrayList();
-    FuzzyKMeansUtil.configureWithClusterInfo(clustersIn, clusters);
-    if (clusters.isEmpty()) {
-      throw new IllegalStateException("Clusters is empty!");
-    }
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(input.toUri(), conf);
-    FileStatus[] status = fs.listStatus(input, PathFilters.logsCRCFilter());
-    int part = 0;
-    for (FileStatus s : status) {
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
-                                                           conf,
-                                                           new Path(output, "part-m-" + part),
-                                                           IntWritable.class,
-                                                           WeightedVectorWritable.class);
-      try {
-        for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(s.getPath(), conf)) {
-          clusterer.emitPointToClusters(value, clusters, writer);
-        }
-      } finally {
-        Closeables.closeQuietly(writer);
-      }
-    }
-
-  }
-
-  private static void clusterDataMR(Path input,
-                                    Path clustersIn,
-                                    Path output,
-                                    DistanceMeasure measure,
-                                    double convergenceDelta,
-                                    float m,
-                                    boolean emitMostLikely,
-                                    double threshold) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration conf = new Configuration();
-    conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
-    conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
-    conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
-    conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
-    conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
-    conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(threshold));
-
-    // Clear output
-    output.getFileSystem(conf).delete(output, true);
-
-    Job job = new Job(conf, "FuzzyKMeans Driver running clusterData over input: " + input);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(WeightedVectorWritable.class);
-
-    FileInputFormat.setInputPaths(job, input);
-    FileOutputFormat.setOutputPath(job, output);
-
-    job.setMapperClass(FuzzyKMeansClusterMapper.class);
-
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setJarByClass(FuzzyKMeansDriver.class);
-
-    if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Fuzzy K-Means Clustering failed processing " + clustersIn);
-    }
+    
+    ClusterClassifier.writePolicy(new FuzzyKMeansClusteringPolicy(m, convergenceDelta), clustersIn);
+    ClusterClassificationDriver.run(input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold, true,
+        runSequential);
   }
 
   /**

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Sat Mar 31 17:30:10 2012
@@ -21,15 +21,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
@@ -54,6 +50,10 @@ import org.apache.mahout.math.VectorWrit
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
 public final class TestFuzzyKmeansClustering extends MahoutTestCase {
 
   private FileSystem fs;
@@ -531,129 +531,6 @@ public final class TestFuzzyKmeansCluste
   }
 
   @Test
-  public void testFuzzyKMeansClusterMapper() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
-
-    for (int k = 0; k < points.size(); k++) {
-      System.out.println("testKFuzzyKMeansMRJob k= " + k);
-      // pick k initial cluster centers at random
-      Collection<SoftCluster> clusterList = Lists.newArrayList();
-
-      for (int i = 0; i < k + 1; i++) {
-        Vector vec = tweakValue(points.get(i).get());
-
-        SoftCluster cluster = new SoftCluster(vec, i, measure);
-        cluster.observe(cluster.getCenter(), 1);
-        clusterList.add(cluster);
-      }
-      for (SoftCluster softCluster : clusterList) {
-        softCluster.computeParameters();
-      }
-
-      // run mapper
-      FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
-      mapper.config(clusterList);
-      DistanceMeasure measure = new EuclideanDistanceMeasure();
-
-      Configuration conf = new Configuration();
-      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
-      conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
-      conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
-      conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, "true");
-      conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, "0");
-
-      DummyRecordWriter<Text, ClusterObservations> mapWriter = new DummyRecordWriter<Text, ClusterObservations>();
-      Mapper<WritableComparable<?>, VectorWritable, Text, ClusterObservations>.Context mapContext =
-          DummyRecordWriter.build(mapper, conf, mapWriter);
-      mapper.setup(mapContext);
-      for (VectorWritable point : points) {
-        mapper.map(new Text(), point, mapContext);
-      }
-
-      // run combiner
-      FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
-      DummyRecordWriter<Text, ClusterObservations> combinerWriter = new DummyRecordWriter<Text, ClusterObservations>();
-      Reducer<Text, ClusterObservations, Text, ClusterObservations>.Context combinerContext =
-          DummyRecordWriter.build(combiner, conf, combinerWriter, Text.class, ClusterObservations.class);
-      combiner.setup(combinerContext);
-      for (Text key : mapWriter.getKeys()) {
-        List<ClusterObservations> values = mapWriter.getValue(key);
-        combiner.reduce(new Text(key), values, combinerContext);
-      }
-
-      // run reducer
-      FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
-      DummyRecordWriter<Text, ClusterWritable> reducerWriter = new DummyRecordWriter<Text, ClusterWritable>();
-      Reducer<Text, ClusterObservations, Text, ClusterWritable>.Context reducerContext =
-          DummyRecordWriter.build(reducer, conf, reducerWriter, Text.class, ClusterObservations.class);
-      reducer.setup(clusterList, conf);
-
-      for (Text key : combinerWriter.getKeys()) {
-        List<ClusterObservations> values = combinerWriter.getValue(key);
-        reducer.reduce(new Text(key), values, reducerContext);
-      }
-
-      // run clusterMapper
-      Collection<SoftCluster> reducerClusters = Lists.newArrayList();
-      for (Text key : reducerWriter.getKeys()) {
-        List<ClusterWritable> values = reducerWriter.getValue(key);
-        reducerClusters.add((SoftCluster) values.get(0).getValue());
-      }
-      for (SoftCluster softCluster : reducerClusters) {
-        softCluster.computeParameters();
-      }
-
-      FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
-      DummyRecordWriter<IntWritable, WeightedVectorWritable> clusterWriter =
-          new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
-      Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context clusterContext =
-          DummyRecordWriter.build(clusterMapper, conf, clusterWriter);
-      clusterMapper.setup(reducerClusters, conf);
-
-      for (VectorWritable point : points) {
-        clusterMapper.map(new Text(), point, clusterContext);
-      }
-
-      // compute the reference result after one iteration and compare
-      List<SoftCluster> reference = Lists.newArrayList();
-      for (int i = 0; i < k + 1; i++) {
-        Vector vec = tweakValue(points.get(i).get());
-        reference.add(new SoftCluster(vec, i, measure));
-      }
-      Map<Integer, List<WeightedVectorWritable>> refClusters = Maps.newHashMap();
-      Collection<Vector> pointsVectors = Lists.newArrayList();
-      for (VectorWritable point : points) {
-        pointsVectors.add(point.get());
-      }
-
-      List<List<SoftCluster>> clusters = FuzzyKMeansClusterer.clusterPoints(pointsVectors,
-                                                                            reference,
-                                                                            new EuclideanDistanceMeasure(),
-                                                                            0.001,
-                                                                            2,
-                                                                            1);
-
-      computeCluster(pointsVectors, clusters.get(clusters.size() - 1),
-                     new FuzzyKMeansClusterer(new EuclideanDistanceMeasure(), 0.001, 2), refClusters);
-
-      // Now compare the clustermapper results with reference implementation
-      assertEquals("mapper and reference sizes", refClusters.size(), clusterWriter.getKeys().size());
-      for (Map.Entry<Integer, List<WeightedVectorWritable>> entry : refClusters.entrySet()) {
-        int key = entry.getKey();
-        List<WeightedVectorWritable> value = entry.getValue();
-        System.out.println("refClusters=" + value + " mapClusters=" + clusterWriter.getValue(new IntWritable(key)));
-        assertEquals("cluster " + key + " sizes", value.size(), clusterWriter.getValue(new IntWritable(key)).size());
-      }
-      // make sure all points are allocated to a cluster
-      int size = 0;
-      for (List<WeightedVectorWritable> pts : refClusters.values()) {
-        size += pts.size();
-      }
-      assertEquals("total size", size, points.size());
-    }
-  }
-
-  @Test
   public void testClusterObservationsSerialization() throws Exception {
     double[] data = { 1.1, 2.2, 3.3 };
     Vector vector = new DenseVector(data);

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java Sat Mar 31 17:30:10 2012
@@ -366,13 +366,14 @@ public final class TestClusterEvaluator 
     // now run the Canopy job to prime kMeans canopies
     Configuration conf = new Configuration();
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, true);
-    // now run the KMeans job
-    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
+    Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+	// now run the KMeans job
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput ,
         measure, 0.001, 10, 2, true, true, 0, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-4");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
-        "clusteredPoints"), output, measure, numIterations, true);
+        "clusteredPoints"), fuzzyKMeansOutput, measure, numIterations, true);
     ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
     // now print out the Results
     System.out.println("Intra-cluster density = "

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java?rev=1307859&r1=1307858&r2=1307859&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java Sat Mar 31 17:30:10 2012
@@ -378,13 +378,14 @@ public final class TestCDbwEvaluator ext
     // now run the Canopy job to prime kMeans canopies
     CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
         false, 0.0, true);
-    // now run the KMeans job
-    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
+    Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+	// now run the KMeans job
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput ,
         measure, 0.001, 10, 2, true, true, 0, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-4");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
-        "clusteredPoints"), output, measure, numIterations, true);
+        "clusteredPoints"), fuzzyKMeansOutput, measure, numIterations, true);
     CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
     // printRepPoints(numIterations);
     // now print out the Results