You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2011/09/27 22:34:47 UTC

svn commit: r1176603 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/common/commandline/ core/src/test/java/org/apache/mahout/clustering/canopy/ examples/src/main/java/org/apache/mahout/c...

Author: jeastman
Date: Tue Sep 27 20:34:47 2011
New Revision: 1176603

URL: http://svn.apache.org/viewvc?rev=1176603&view=rev
Log:
MAHOUT-818: Patch to filter canopies based upon numPoints. All tests run

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java Tue Sep 27 20:34:47 2011
@@ -32,4 +32,6 @@ public interface CanopyConfigKeys {
   // keys used by Driver, Mapper, Combiner & Reducer
   String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
 
+  String CF_KEY = "org.apache.mahout.clustering.canopy.canopyFilter";
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Tue Sep 27 20:34:47 2011
@@ -70,6 +70,7 @@ public class CanopyDriver extends Abstra
     addOption(DefaultOptionCreator.t2Option().create());
     addOption(DefaultOptionCreator.t3Option().create());
     addOption(DefaultOptionCreator.t4Option().create());
+    addOption(DefaultOptionCreator.clusterFilterOption().create());
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(DefaultOptionCreator.methodOption().create());
@@ -95,13 +96,20 @@ public class CanopyDriver extends Abstra
     if (hasOption(DefaultOptionCreator.T4_OPTION)) {
       t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
     }
+    int clusterFilter = 0;
+    if (hasOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION)) {
+      clusterFilter = Integer
+          .parseInt(getOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION));
+    }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
-        DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
+        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
+    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
+        DistanceMeasure.class).newInstance();
 
-    run(conf, input, output, measure, t1, t2, t3, t4, runClustering, runSequential);
+    run(conf, input, output, measure, t1, t2, t3, t4, clusterFilter,
+        runClustering, runSequential);
     return 0;
   }
 
@@ -125,41 +133,35 @@ public class CanopyDriver extends Abstra
    *          the reducer's double T1 distance metric
    * @param t4
    *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the minimum canopy size output by the mappers
    * @param runClustering
    *          cluster the input vectors if true
    * @param runSequential
    *          execute sequentially if true
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         double t3,
-                         double t4,
-                         boolean runClustering,
-                         boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
-    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3, t4, runSequential);
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runClustering, boolean runSequential)
+      throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
+        t4, clusterFilter, runSequential);
     if (runClustering) {
-      clusterData(conf, input, clustersOut, output, measure, t1, t2, runSequential);
+      clusterData(conf, input, clustersOut, output, measure, t1, t2,
+          runSequential);
     }
   }
 
   /**
    * Convenience method to provide backward compatibility
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         boolean runClustering,
-                         boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
-    run(conf, input, output, measure, t1, t2, t1, t2, runClustering, runSequential);
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, boolean runClustering,
+      boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    run(conf, input, output, measure, t1, t2, t1, t2, 0, runClustering,
+        runSequential);
   }
 
   /**
@@ -180,29 +182,24 @@ public class CanopyDriver extends Abstra
    * @param runSequential
    *          execute sequentially if true
    */
-  public static void run(Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         boolean runClustering,
-                         boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
-    run(new Configuration(), input, output, measure, t1, t2, runClustering, runSequential);
+  public static void run(Path input, Path output, DistanceMeasure measure,
+      double t1, double t2, boolean runClustering, boolean runSequential)
+      throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    run(new Configuration(), input, output, measure, t1, t2, runClustering,
+        runSequential);
   }
 
   /**
    * Convenience method for backwards compatibility
+   * 
    */
-  public static Path buildClusters(Configuration conf,
-                                   Path input,
-                                   Path output,
-                                   DistanceMeasure measure,
-                                   double t1,
-                                   double t2,
-                                   boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    return buildClusters(conf, input, output, measure, t1, t2, t1, t2, runSequential);
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter,
+      boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    return buildClusters(conf, input, output, measure, t1, t2, t1, t2,
+        clusterFilter, runSequential);
   }
 
   /**
@@ -225,26 +222,23 @@ public class CanopyDriver extends Abstra
    *          the reducer's double T1 distance metric
    * @param t4
    *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
    * @param runSequential
    *          a boolean indicates to run the sequential (reference) algorithm
    * @return the canopy output directory Path
    */
-  public static Path buildClusters(Configuration conf,
-                                   Path input,
-                                   Path output,
-                                   DistanceMeasure measure,
-                                   double t1,
-                                   double t2,
-                                   double t3,
-                                   double t4,
-                                   boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
     log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
-             new Object[] {input, output, measure, t1, t2});
+        new Object[] { input, output, measure, t1, t2 });
     if (runSequential) {
-      return buildClustersSeq(input, output, measure, t1, t2);
+      return buildClustersSeq(input, output, measure, t1, t2, clusterFilter);
     } else {
-      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4);
+      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4,
+          clusterFilter);
     }
   }
 
@@ -262,37 +256,38 @@ public class CanopyDriver extends Abstra
    *          the double T1 distance metric
    * @param t2
    *          the double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
    * @return the canopy output directory Path
    */
-  private static Path buildClustersSeq(Path input,
-                                       Path output,
-                                       DistanceMeasure measure,
-                                       double t1,
-                                       double t2) throws IOException {
+  private static Path buildClustersSeq(Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter)
+      throws IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
     Collection<Canopy> canopies = Lists.newArrayList();
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(input.toUri(), conf);
 
-    for (VectorWritable vw
-         : new SequenceFileDirValueIterable<VectorWritable>(input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+    for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(
+        input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
       clusterer.addPointToCanopies(vw.get(), canopies);
     }
 
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     Path path = new Path(canopyOutputDir, "part-r-00000");
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Canopy.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, Canopy.class);
     try {
       for (Canopy canopy : canopies) {
         canopy.computeParameters();
         log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
-                  new Object[] {
-                      canopy.getIdentifier(),
-                      AbstractCluster.formatVector(canopy.getCenter(), null),
-                      canopy.getNumPoints(),
-                      AbstractCluster.formatVector(canopy.getRadius(), null)
-                  });
-        writer.append(new Text(canopy.getIdentifier()), canopy);
+            new Object[] { canopy.getIdentifier(),
+                AbstractCluster.formatVector(canopy.getCenter(), null),
+                canopy.getNumPoints(),
+                AbstractCluster.formatVector(canopy.getRadius(), null) });
+        if (canopy.getNumPoints() > clusterFilter) {
+          writer.append(new Text(canopy.getIdentifier()), canopy);
+        }
       }
     } finally {
       Closeables.closeQuietly(writer);
@@ -305,6 +300,7 @@ public class CanopyDriver extends Abstra
    * arguments. Run mapreduce execution
    * 
    * @param conf
+   *          the Configuration
    * @param input
    *          the Path to the directory containing input vectors
    * @param output
@@ -319,25 +315,24 @@ public class CanopyDriver extends Abstra
    *          the reducer's double T1 distance metric
    * @param t4
    *          the reducer's double T2 distance metric
-   * 
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
    * @return the canopy output directory Path
    */
-  private static Path buildClustersMR(Configuration conf,
-                                      Path input,
-                                      Path output,
-                                      DistanceMeasure measure,
-                                      double t1,
-                                      double t2,
-                                      double t3,
-                                      double t4)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
+  private static Path buildClustersMR(Configuration conf, Path input,
+      Path output, DistanceMeasure measure, double t1, double t2, double t3,
+      double t4, int clusterFilter) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
     conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
+    conf.set(CanopyConfigKeys.CF_KEY, String.valueOf(clusterFilter));
 
-    Job job = new Job(conf, "Canopy Driver running buildClusters over input: " + input);
+    Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
+        + input);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(CanopyMapper.class);
@@ -358,15 +353,11 @@ public class CanopyDriver extends Abstra
     return canopyOutputDir;
   }
 
-  public static void clusterData(Configuration conf,
-                                 Path points,
-                                 Path canopies,
-                                 Path output,
-                                 DistanceMeasure measure,
-                                 double t1,
-                                 double t2,
-                                 boolean runSequential)
-    throws InstantiationException, IllegalAccessException, IOException, InterruptedException, ClassNotFoundException {
+  public static void clusterData(Configuration conf, Path points,
+      Path canopies, Path output, DistanceMeasure measure, double t1,
+      double t2, boolean runSequential) throws InstantiationException,
+      IllegalAccessException, IOException, InterruptedException,
+      ClassNotFoundException {
     if (runSequential) {
       clusterDataSeq(points, canopies, output, measure, t1, t2);
     } else {
@@ -374,20 +365,16 @@ public class CanopyDriver extends Abstra
     }
   }
 
-  private static void clusterDataSeq(Path points,
-                                     Path canopies,
-                                     Path output,
-                                     DistanceMeasure measure,
-                                     double t1,
-                                     double t2)
-    throws InstantiationException, IllegalAccessException, IOException {
+  private static void clusterDataSeq(Path points, Path canopies, Path output,
+      DistanceMeasure measure, double t1, double t2)
+      throws InstantiationException, IllegalAccessException, IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
 
     Collection<Canopy> clusters = Lists.newArrayList();
     Configuration conf = new Configuration();
 
-    for (Canopy value
-         : new SequenceFileDirValueIterable<Canopy>(canopies, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+    for (Canopy value : new SequenceFileDirValueIterable<Canopy>(canopies,
+        PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
       clusters.add(value);
     }
 
@@ -422,20 +409,17 @@ public class CanopyDriver extends Abstra
     }
   }
 
-  private static void clusterDataMR(Configuration conf,
-                                    Path points,
-                                    Path canopies,
-                                    Path output,
-                                    DistanceMeasure measure,
-                                    double t1,
-                                    double t2)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
+  private static void clusterDataMR(Configuration conf, Path points,
+      Path canopies, Path output, DistanceMeasure measure, double t1, double t2)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
 
-    Job job = new Job(conf, "Canopy Driver running clusterData over input: " + points);
+    Job job = new Job(conf, "Canopy Driver running clusterData over input: "
+        + points);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(ClusterMapper.class);
@@ -450,7 +434,8 @@ public class CanopyDriver extends Abstra
     HadoopUtil.delete(conf, outPath);
 
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Clustering failed processing " + canopies);
+      throw new InterruptedException("Canopy Clustering failed processing "
+          + canopies);
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Tue Sep 27 20:34:47 2011
@@ -26,28 +26,39 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VectorWritable;
 
-class CanopyMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
+class CanopyMapper extends
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
 
   private final Collection<Canopy> canopies = Lists.newArrayList();
 
   private CanopyClusterer canopyClusterer;
 
+  private Integer clusterFilter;
+
   @Override
-  protected void map(WritableComparable<?> key, VectorWritable point, Context context)
-    throws IOException, InterruptedException {
+  protected void map(WritableComparable<?> key, VectorWritable point,
+      Context context) throws IOException, InterruptedException {
     canopyClusterer.addPointToCanopies(point.get(), canopies);
   }
 
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
     super.setup(context);
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
+    clusterFilter = Integer.valueOf(context.getConfiguration().get(
+        CanopyConfigKeys.CF_KEY));
   }
 
   @Override
-  protected void cleanup(Context context) throws IOException, InterruptedException {
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException {
     for (Canopy canopy : canopies) {
-      context.write(new Text("centroid"), new VectorWritable(canopy.computeCentroid()));
+      canopy.computeParameters();
+      if (canopy.getNumPoints() > clusterFilter) {
+        context.write(new Text("centroid"), new VectorWritable(canopy
+            .getCenter()));
+      }
     }
     super.cleanup(context);
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Tue Sep 27 20:34:47 2011
@@ -32,6 +32,8 @@ public class CanopyReducer extends Reduc
 
   private CanopyClusterer canopyClusterer;
 
+  private Integer clusterFilter;
+
   CanopyClusterer getCanopyClusterer() {
     return canopyClusterer;
   }
@@ -45,7 +47,9 @@ public class CanopyReducer extends Reduc
     }
     for (Canopy canopy : canopies) {
       canopy.computeParameters();
-      context.write(new Text(canopy.getIdentifier()), canopy);
+      if (canopy.getNumPoints() > clusterFilter) {
+        context.write(new Text(canopy.getIdentifier()), canopy);
+      }
     }
   }
 
@@ -55,6 +59,8 @@ public class CanopyReducer extends Reduc
     super.setup(context);
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
     canopyClusterer.useT3T4();
+    clusterFilter = Integer.valueOf(context.getConfiguration().get(
+        CanopyConfigKeys.CF_KEY));
   }
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java Tue Sep 27 20:34:47 2011
@@ -58,6 +58,8 @@ public final class DefaultOptionCreator 
   
   public static final String T4_OPTION = "t4";
   
+  public static final String CLUSTER_FILTER_OPTION = "clusterFilter";
+  
   public static final String THRESHOLD_OPTION = "threshold";
   
   public static final String SEQUENTIAL_METHOD = "sequential";
@@ -201,7 +203,7 @@ public final class DefaultOptionCreator 
   
   /**
    * Returns a default command line option for specification of T3 (Reducer T1).
-   * Used by Canopy, MeanShift
+   * Used by Canopy
    */
   public static DefaultOptionBuilder t3Option() {
     return new DefaultOptionBuilder()
@@ -216,7 +218,7 @@ public final class DefaultOptionCreator 
   
   /**
    * Returns a default command line option for specification of T4 (Reducer T2).
-   * Used by Canopy, MeanShift
+   * Used by Canopy
    */
   public static DefaultOptionBuilder t4Option() {
     return new DefaultOptionBuilder()
@@ -230,6 +232,21 @@ public final class DefaultOptionCreator 
   }
   
   /**
+ * @return a DefaultOptionBuilder for the clusterFilter option
+ */
+public static DefaultOptionBuilder clusterFilterOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(CLUSTER_FILTER_OPTION)
+        .withShortName("cf")
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(CLUSTER_FILTER_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("Cluster filter suppresses small canopies from mapper")
+        .withShortName(CLUSTER_FILTER_OPTION);
+  }
+  
+  /**
    * Returns a default command line option for specification of max number of
    * iterations. Used by Dirichlet, FuzzyKmeans, Kmeans, LDA
    */

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Tue Sep 27 20:34:47 2011
@@ -189,6 +189,7 @@ public final class TestCanopyCreation ex
         .getClass().getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
     DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
     Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
         .build(mapper, conf, writer);
@@ -224,6 +225,7 @@ public final class TestCanopyCreation ex
         .getClass().getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
     DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
     Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
         .build(mapper, conf, writer);
@@ -259,6 +261,7 @@ public final class TestCanopyCreation ex
         "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
     DummyRecordWriter<Text, Canopy> writer = new DummyRecordWriter<Text, Canopy>();
     Reducer<Text, VectorWritable, Text, Canopy>.Context context = DummyRecordWriter
         .build(reducer, conf, writer, Text.class, VectorWritable.class);
@@ -292,6 +295,7 @@ public final class TestCanopyCreation ex
         "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
     DummyRecordWriter<Text, Canopy> writer = new DummyRecordWriter<Text, Canopy>();
     Reducer<Text, VectorWritable, Text, Canopy>.Context context = DummyRecordWriter
         .build(reducer, conf, writer, Text.class, VectorWritable.class);
@@ -404,10 +408,9 @@ public final class TestCanopyCreation ex
         "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
-    DummyRecordWriter<IntWritable, WeightedVectorWritable> writer =
-        new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
-    Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context =
-        DummyRecordWriter.build(mapper, conf, writer);
+    DummyRecordWriter<IntWritable, WeightedVectorWritable> writer = new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
+    Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context = DummyRecordWriter
+        .build(mapper, conf, writer);
     mapper.setup(context);
 
     Collection<Canopy> canopies = Lists.newArrayList();
@@ -653,6 +656,7 @@ public final class TestCanopyCreation ex
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(1.1));
     conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(0.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
     DummyRecordWriter<Text, Canopy> writer = new DummyRecordWriter<Text, Canopy>();
     Reducer<Text, VectorWritable, Text, Canopy>.Context context = DummyRecordWriter
         .build(reducer, conf, writer, Text.class, VectorWritable.class);
@@ -660,4 +664,58 @@ public final class TestCanopyCreation ex
     assertEquals(1.1, reducer.getCanopyClusterer().getT1(), EPSILON);
     assertEquals(0.1, reducer.getCanopyClusterer().getT2(), EPSILON);
   }
+
+  /**
+   * Story: User can specify a clustering limit that prevents output of small
+   * clusters
+   */
+  @Test
+  public void testCanopyMapperClusterFilter() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    Configuration conf = new Configuration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure
+        .getClass().getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "3");
+    DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+        .build(mapper, conf, writer);
+    mapper.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    // map the data
+    for (VectorWritable point : points) {
+      mapper.map(new Text(), point, context);
+    }
+    mapper.cleanup(context);
+    assertEquals("Number of map results", 1, writer.getData().size());
+    // now verify the output
+    List<VectorWritable> data = writer.getValue(new Text("centroid"));
+    assertEquals("Number of centroids", 2, data.size());
+  }
+
+  /**
+   * Story: User can specify a cluster filter that limits the minimum size of
+   * canopies produced by the reducer
+   */
+  @Test
+  public void testCanopyReducerClusterFilter() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    Configuration conf = new Configuration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "3");
+    DummyRecordWriter<Text, Canopy> writer = new DummyRecordWriter<Text, Canopy>();
+    Reducer<Text, VectorWritable, Text, Canopy>.Context context = DummyRecordWriter
+        .build(reducer, conf, writer, Text.class, VectorWritable.class);
+    reducer.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    reducer.reduce(new Text("centroid"), points, context);
+    Set<Text> keys = writer.getKeys();
+    assertEquals("Number of centroids", 2, keys.size());
+  }
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java Tue Sep 27 20:34:47 2011
@@ -80,7 +80,7 @@ public class DisplayCanopy extends Displ
     writeSampleData(samples);
     //boolean b = true;
     //if (b) {
-    CanopyDriver.buildClusters(conf, samples, output, new ManhattanDistanceMeasure(), T1, T2, true);
+    CanopyDriver.buildClusters(conf, samples, output, new ManhattanDistanceMeasure(), T1, T2, 0, true);
     loadClusters(output);
     //} else {
     //  List<Vector> points = new ArrayList<Vector>();

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java?rev=1176603&r1=1176602&r2=1176603&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java Tue Sep 27 20:34:47 2011
@@ -66,7 +66,7 @@ public class DisplayFuzzyKMeans extends 
     RandomUtils.useTestSeed();
     DisplayClustering.generateSamples();
     writeSampleData(samples);
-    boolean runClusterer = false;
+    boolean runClusterer = true;
     int maxIterations = 10;
     if (runClusterer) {
       runSequentialFuzzyKClusterer(conf, samples, output, measure, maxIterations);