You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2012/06/11 23:13:13 UTC

svn commit: r1349015 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/ examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/diri...

Author: jeastman
Date: Mon Jun 11 21:13:13 2012
New Revision: 1349015

URL: http://svn.apache.org/viewvc?rev=1349015&view=rev
Log:
MAHOUT-1017:
- improved logging message in FuzzyKMeansDriver
- converted meanshift InputDriver and InputMapper to emit ClusterWritables used by other jobs
- fixed synthetic control jobs to properly handle the -final suffix on final clusters-i directory
- all synthetic control jobs run
- all unit tests pass

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Mon Jun 11 21:13:13 2012
@@ -274,7 +274,7 @@ public class FuzzyKMeansDriver extends A
     }
     
     if (clusters.isEmpty()) {
-      throw new IllegalStateException("No input clusters found. Check your -c argument.");
+      throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
     }
     
     Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);   

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java Mon Jun 11 21:13:13 2012
@@ -88,7 +88,7 @@ public final class Job extends AbstractJ
         output, measure, t1, t2, true, 0.0, false);
     // run ClusterDumper
     ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
-        "clusters-0"), new Path(output, "clusteredPoints"));
+        "clusters-0-final"), new Path(output, "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
 

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java Mon Jun 11 21:13:13 2012
@@ -38,14 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class Job extends AbstractJob {
-
+  
   private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+  
   private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
-  private Job() {
-  }
-
+  
+  private Job() {}
+  
   public static void main(String[] args) throws Exception {
     if (args.length > 0) {
       log.info("Running with only user-supplied arguments");
@@ -54,45 +53,53 @@ public final class Job extends AbstractJ
       log.info("Running with default arguments");
       Path output = new Path("output");
       HadoopUtil.delete(new Configuration(), output);
-      DistributionDescription description =
-          new DistributionDescription(GaussianClusterDistribution.class.getName(),
-                                      RandomAccessSparseVector.class.getName(),
-                                      null,
-                                      60);
+      DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
+          RandomAccessSparseVector.class.getName(), null, 60);
       run(new Path("testdata"), output, description, 10, 5, 1.0, true, 0);
     }
   }
-
+  
   @Override
-  public int run(String[] args) throws Exception{
+  public int run(String[] args) throws Exception {
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.maxIterationsOption().create());
     addOption(DefaultOptionCreator.numClustersOption().withRequired(true).create());
     addOption(DefaultOptionCreator.overwriteOption().create());
-    addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.ALPHA_OPTION).withRequired(false)
-        .withShortName("m").withArgument(new ArgumentBuilder().withName(DirichletDriver.ALPHA_OPTION).withDefault("1.0")
-            .withMinimum(1).withMaximum(1).create())
+    addOption(new DefaultOptionBuilder()
+        .withLongName(DirichletDriver.ALPHA_OPTION)
+        .withRequired(false)
+        .withShortName("m")
+        .withArgument(
+            new ArgumentBuilder().withName(DirichletDriver.ALPHA_OPTION).withDefault("1.0").withMinimum(1)
+                .withMaximum(1).create())
         .withDescription("The alpha0 value for the DirichletDistribution. Defaults to 1.0").create());
-    addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
-        .withRequired(false).withShortName("md").withArgument(new ArgumentBuilder()
-            .withName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
-            .withDefault(GaussianClusterDistribution.class.getName()).withMinimum(1).withMaximum(1).create())
+    addOption(new DefaultOptionBuilder()
+        .withLongName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
+        .withRequired(false)
+        .withShortName("md")
+        .withArgument(
+            new ArgumentBuilder().withName(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION)
+                .withDefault(GaussianClusterDistribution.class.getName()).withMinimum(1).withMaximum(1).create())
         .withDescription("The ModelDistribution class name. Defaults to GaussianClusterDistribution").create());
-    addOption(new DefaultOptionBuilder().withLongName(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION).withRequired(false)
-        .withShortName("mp").withArgument(new ArgumentBuilder().withName("prototypeClass")
-            .withDefault(RandomAccessSparseVector.class.getName()).withMinimum(1).withMaximum(1).create())
+    addOption(new DefaultOptionBuilder()
+        .withLongName(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION)
+        .withRequired(false)
+        .withShortName("mp")
+        .withArgument(
+            new ArgumentBuilder().withName("prototypeClass").withDefault(RandomAccessSparseVector.class.getName())
+                .withMinimum(1).withMaximum(1).create())
         .withDescription("The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector")
         .create());
     addOption(DefaultOptionCreator.distanceMeasureOption().withRequired(false).create());
     addOption(DefaultOptionCreator.emitMostLikelyOption().create());
     addOption(DefaultOptionCreator.thresholdOption().create());
-
-    Map<String, List<String>> argMap = parseArguments(args);
+    
+    Map<String,List<String>> argMap = parseArguments(args);
     if (argMap == null) {
       return -1;
     }
-
+    
     Path input = getInputPath();
     Path output = getOutputPath();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -106,13 +113,12 @@ public final class Job extends AbstractJ
     boolean emitMostLikely = Boolean.parseBoolean(getOption(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION));
     double threshold = Double.parseDouble(getOption(DefaultOptionCreator.THRESHOLD_OPTION));
     double alpha0 = Double.parseDouble(getOption(DirichletDriver.ALPHA_OPTION));
-    DistributionDescription description =
-        new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, 60);
-
+    DistributionDescription description = new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, 60);
+    
     run(input, output, description, numModels, maxIterations, alpha0, emitMostLikely, threshold);
     return 0;
   }
-
+  
   /**
    * Run the job using supplied arguments, deleting the output directory if it exists beforehand
    * 
@@ -120,7 +126,8 @@ public final class Job extends AbstractJ
    *          the directory pathname for input points
    * @param output
    *          the directory pathname for output points
-   * @param description the model distribution description
+   * @param description
+   *          the model distribution description
    * @param numModels
    *          the number of Models
    * @param maxIterations
@@ -128,25 +135,18 @@ public final class Job extends AbstractJ
    * @param alpha0
    *          the alpha0 value for the DirichletDistribution
    */
-  public static void run(Path input,
-                         Path output,
-                         DistributionDescription description,
-                         int numModels,
-                         int maxIterations,
-                         double alpha0,
-                         boolean emitMostLikely,
-                         double threshold)
-          throws Exception{
+  public static void run(Path input, Path output, DistributionDescription description, int numModels,
+      int maxIterations, double alpha0, boolean emitMostLikely, double threshold) throws Exception {
     Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
     InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
-    DirichletDriver.run(new Configuration(), directoryContainingConvertedInput, output, description, numModels, maxIterations, alpha0, true,
-    emitMostLikely, threshold, false);
+    DirichletDriver.run(new Configuration(), directoryContainingConvertedInput, output, description, numModels,
+        maxIterations, alpha0, true, emitMostLikely, threshold, false);
     // run ClusterDumper
-    ClusterDumper clusterDumper =
-        new ClusterDumper(new Path(output, "clusters-" + maxIterations), new Path(output, "clusteredPoints"));
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+        "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
-
+  
   /**
    * Actually prints out the clusters
    * 
@@ -156,22 +156,11 @@ public final class Job extends AbstractJ
    *          the minimum number of samples to enable printing a model
    */
   /*
-  private static void printClusters(Iterable<List<DirichletCluster>> clusters, int significant) {
-    int row = 0;
-    StringBuilder result = new StringBuilder(100);
-    for (List<DirichletCluster> r : clusters) {
-      result.append("sample=").append(row++).append("]= ");
-      for (int k = 0; k < r.size(); k++) {
-        Model<VectorWritable> model = r.get(k).getModel();
-        if (model.count() > significant) {
-          int total = (int) r.get(k).getTotalCount();
-          result.append('m').append(k).append('(').append(total).append(')').append(model).append(", ");
-        }
-      }
-      result.append('\n');
-    }
-    result.append('\n');
-    log.info(result.toString());
-  }
+   * private static void printClusters(Iterable<List<DirichletCluster>> clusters, int significant) { int row = 0;
+   * StringBuilder result = new StringBuilder(100); for (List<DirichletCluster> r : clusters) {
+   * result.append("sample=").append(row++).append("]= "); for (int k = 0; k < r.size(); k++) { Model<VectorWritable>
+   * model = r.get(k).getModel(); if (model.count() > significant) { int total = (int) r.get(k).getTotalCount();
+   * result.append('m').append(k).append('(').append(total).append(')').append(model).append(", "); } }
+   * result.append('\n'); } result.append('\n'); log.info(result.toString()); }
    */
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java Mon Jun 11 21:13:13 2012
@@ -17,17 +17,14 @@
 
 package org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.cli2.builder.ArgumentBuilder;
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.canopy.CanopyDriver;
 import org.apache.mahout.clustering.conversion.InputDriver;
 import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
@@ -43,16 +40,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class Job extends AbstractJob {
-
+  
   private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+  
   private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
+  
   private static final String M_OPTION = FuzzyKMeansDriver.M_OPTION;
-
-  private Job() {
-  }
-
+  
+  private Job() {}
+  
   public static void main(String[] args) throws Exception {
     if (args.length > 0) {
       log.info("Running with only user-supplied arguments");
@@ -62,13 +58,12 @@ public final class Job extends AbstractJ
       Path output = new Path("output");
       Configuration conf = new Configuration();
       HadoopUtil.delete(conf, output);
-      run(conf, new Path("testdata"), output,
-          new EuclideanDistanceMeasure(), 80, 55, 10, (float) 2, 0.5);
+      run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55, 10, (float) 2, 0.5);
     }
   }
-
+  
   @Override
-  public int run(String[] args) throws Exception{
+  public int run(String[] args) throws Exception {
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.distanceMeasureOption().create());
@@ -77,68 +72,43 @@ public final class Job extends AbstractJ
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.t1Option().create());
     addOption(DefaultOptionCreator.t2Option().create());
-    addOption(M_OPTION, M_OPTION,
-        "coefficient normalization factor, must be greater than 1", true);
-
-    Map<String, List<String>> argMap = parseArguments(args);
+    addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be greater than 1", true);
+    
+    Map<String,List<String>> argMap = parseArguments(args);
     if (argMap == null) {
       return -1;
     }
-
+    
     Path input = getInputPath();
     Path output = getOutputPath();
     String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
     if (measureClass == null) {
       measureClass = SquaredEuclideanDistanceMeasure.class.getName();
     }
-    double convergenceDelta = Double
-        .parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
-    int maxIterations = Integer
-        .parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
     float fuzziness = Float.parseFloat(getOption(M_OPTION));
-
-    addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(
-        true).withArgument(
-        new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1)
-            .create()).withDescription(
-        "coefficient normalization factor, must be greater than 1")
-        .withShortName(M_OPTION).create());
+    
+    addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(true)
+        .withArgument(new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1).create())
+        .withDescription("coefficient normalization factor, must be greater than 1").withShortName(M_OPTION).create());
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
       HadoopUtil.delete(getConf(), output);
     }
     DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
     double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
     double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
-    run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness,
-        convergenceDelta);
+    run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness, convergenceDelta);
     return 0;
   }
-
-  /**
-   * Return the path to the final iteration's clusters
-   */
-  private static Path finalClusterPath(Configuration conf, Path output,
-      int maxIterations) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    for (int i = maxIterations; i >= 0; i--) {
-      Path clusters = new Path(output, "clusters-" + i);
-      if (fs.exists(clusters)) {
-        return clusters;
-      }
-    }
-    return null;
-  }
-
+  
   /**
-   * Run the kmeans clustering job on an input dataset using the given distance
-   * measure, t1, t2 and iteration parameters. All output data will be written
-   * to the output directory, which will be initially deleted if it exists. The
-   * clustered points will reside in the path <output>/clustered-points. By
-   * default, the job expects the a file containing synthetic_control.data as
-   * obtained from
-   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
-   * resides in a directory named "testdata", and writes output to a directory
-   * named "output".
+   * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+   * containing synthetic_control.data as obtained from
+   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+   * and writes output to a directory named "output".
    * 
    * @param input
    *          the String denoting the input directory path
@@ -155,31 +125,21 @@ public final class Job extends AbstractJ
    * @param convergenceDelta
    *          the double convergence criteria for iterations
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         int maxIterations,
-                         float fuzziness,
-                         double convergenceDelta)
-          throws Exception{
-    Path directoryContainingConvertedInput = new Path(output,
-        DIRECTORY_CONTAINING_CONVERTED_INPUT);
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+      int maxIterations, float fuzziness, double convergenceDelta) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
     log.info("Preparing Input");
-    InputDriver.runJob(input, directoryContainingConvertedInput,
-        "org.apache.mahout.math.RandomAccessSparseVector");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
     log.info("Running Canopy to get initial clusters");
-    CanopyDriver.run(new Configuration(), directoryContainingConvertedInput,
-        output, measure, t1, t2, false, 0.0, false);
+    Path canopyOutput = new Path(output, "canopies");
+    CanopyDriver
+        .run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, false);
     log.info("Running FuzzyKMeans");
-    FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(output,
-        Cluster.INITIAL_CLUSTERS_DIR), output, measure, convergenceDelta,
-        maxIterations, fuzziness, true, true, 0.0, false);
+    FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output, measure,
+        convergenceDelta, maxIterations, fuzziness, true, true, 0.0, false);
     // run ClusterDumper
-    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
-        output, maxIterations), new Path(output, "clusteredPoints"));
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+        "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java Mon Jun 11 21:13:13 2012
@@ -17,12 +17,10 @@
 
 package org.apache.mahout.clustering.syntheticcontrol.kmeans;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
@@ -42,14 +40,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class Job extends AbstractJob {
-
+  
   private static final Logger log = LoggerFactory.getLogger(Job.class);
-
+  
   private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
-
-  private Job() {
-  }
-
+  
+  private Job() {}
+  
   public static void main(String[] args) throws Exception {
     if (args.length > 0) {
       log.info("Running with only user-supplied arguments");
@@ -62,9 +59,9 @@ public final class Job extends AbstractJ
       run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 6, 0.5, 10);
     }
   }
-
+  
   @Override
-  public int run(String[] args) throws Exception{
+  public int run(String[] args) throws Exception {
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.distanceMeasureOption().create());
@@ -74,47 +71,41 @@ public final class Job extends AbstractJ
     addOption(DefaultOptionCreator.convergenceOption().create());
     addOption(DefaultOptionCreator.maxIterationsOption().create());
     addOption(DefaultOptionCreator.overwriteOption().create());
-
-    Map<String, List<String>> argMap = parseArguments(args);
+    
+    Map<String,List<String>> argMap = parseArguments(args);
     if (argMap == null) {
       return -1;
     }
-
+    
     Path input = getInputPath();
     Path output = getOutputPath();
     String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
     if (measureClass == null) {
       measureClass = SquaredEuclideanDistanceMeasure.class.getName();
     }
-    double convergenceDelta = Double
-        .parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
-    int maxIterations = Integer
-        .parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
       HadoopUtil.delete(getConf(), output);
     }
     DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
     if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
-      int k = Integer
-          .parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+      int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
       run(getConf(), input, output, measure, k, convergenceDelta, maxIterations);
     } else {
       double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
       double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
-      run(getConf(), input, output, measure, t1, t2, convergenceDelta,
-          maxIterations);
+      run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations);
     }
     return 0;
   }
-
+  
   /**
-   * Run the kmeans clustering job on an input dataset using the given the
-   * number of clusters k and iteration parameters. All output data will be
-   * written to the output directory, which will be initially deleted if it
-   * exists. The clustered points will reside in the path
-   * <output>/clustered-points. By default, the job expects a file containing
-   * equal length space delimited data that resides in a directory named
-   * "testdata", and writes output to a directory named "output".
+   * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file
+   * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a
+   * directory named "output".
    * 
    * @param conf
    *          the Configuration to use
@@ -131,37 +122,30 @@ public final class Job extends AbstractJ
    * @param maxIterations
    *          the int maximum number of iterations
    */
-  public static void run(Configuration conf, Path input, Path output,
-                         DistanceMeasure measure, int k, double convergenceDelta, int maxIterations)
-          throws Exception{
-    Path directoryContainingConvertedInput = new Path(output,
-        DIRECTORY_CONTAINING_CONVERTED_INPUT);
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
+      double convergenceDelta, int maxIterations) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
     log.info("Preparing Input");
-    InputDriver.runJob(input, directoryContainingConvertedInput,
-        "org.apache.mahout.math.RandomAccessSparseVector");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
     log.info("Running random seed to get initial clusters");
     Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
-    clusters = RandomSeedGenerator.buildRandom(conf,
-        directoryContainingConvertedInput, clusters, k, measure);
+    clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
     log.info("Running KMeans");
-    KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output,
-        measure, convergenceDelta, maxIterations, true, 0.0, false);
+    KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta,
+        maxIterations, true, 0.0, false);
     // run ClusterDumper
-    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
-        output, maxIterations), new Path(output, "clusteredPoints"));
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+        "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
-
+  
   /**
-   * Run the kmeans clustering job on an input dataset using the given distance
-   * measure, t1, t2 and iteration parameters. All output data will be written
-   * to the output directory, which will be initially deleted if it exists. The
-   * clustered points will reside in the path <output>/clustered-points. By
-   * default, the job expects the a file containing synthetic_control.data as
-   * obtained from
-   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
-   * resides in a directory named "testdata", and writes output to a directory
-   * named "output".
+   * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+   * containing synthetic_control.data as obtained from
+   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+   * and writes output to a directory named "output".
    * 
    * @param conf
    *          the Configuration to use
@@ -180,40 +164,21 @@ public final class Job extends AbstractJ
    * @param maxIterations
    *          the int maximum number of iterations
    */
-  public static void run(Configuration conf, Path input, Path output,
-                         DistanceMeasure measure, double t1, double t2, double convergenceDelta,
-                         int maxIterations)
-          throws Exception{
-    Path directoryContainingConvertedInput = new Path(output,
-        DIRECTORY_CONTAINING_CONVERTED_INPUT);
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+      double convergenceDelta, int maxIterations) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
     log.info("Preparing Input");
-    InputDriver.runJob(input, directoryContainingConvertedInput,
-        "org.apache.mahout.math.RandomAccessSparseVector");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
     log.info("Running Canopy to get initial clusters");
-    CanopyDriver.run(conf, directoryContainingConvertedInput, output, measure,
-        t1, t2, false, 0.0, false);
+    Path canopyOutput = new Path(output, "canopies");
+    CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0,
+        false);
     log.info("Running KMeans");
-    KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(output,
-        Cluster.INITIAL_CLUSTERS_DIR), output, measure, convergenceDelta,
-        maxIterations, true, 0.0, false);
+    KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR
+        + "-final"), output, measure, convergenceDelta, maxIterations, true, 0.0, false);
     // run ClusterDumper
-    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
-        output, maxIterations), new Path(output, "clusteredPoints"));
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+        "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
-
-  /**
-   * Return the path to the final iteration's clusters
-   */
-  private static Path finalClusterPath(Configuration conf, Path output,
-      int maxIterations) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    for (int i = maxIterations; i >= 0; i--) {
-      Path clusters = new Path(output, "clusters-" + i);
-      if (fs.exists(clusters)) {
-        return clusters;
-      }
-    }
-    return null;
-  }
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java Mon Jun 11 21:13:13 2012
@@ -142,7 +142,7 @@ public final class Job extends AbstractJ
         true, false);
     // run ClusterDumper
     ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
-        "clusters-" + maxIterations), new Path(output, "clusteredPoints"));
+        "clusters-*-final"), new Path(output, "clusteredPoints"));
     clusterDumper.printClusters(null);
   }
   

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputDriver.java Mon Jun 11 21:13:13 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.Reduc
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.slf4j.Logger;
@@ -82,7 +82,7 @@ public final class InputDriver {
 
     Job job = new Job(conf, "Mean Shift Input Driver running over input: " + input);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(MeanShiftCanopy.class);
+    job.setOutputValueClass(ClusterWritable.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(org.apache.mahout.clustering.conversion.meanshift.InputMapper.class);
     job.setReducerClass(Reducer.class);

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java?rev=1349015&r1=1349014&r2=1349015&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/clustering/conversion/meanshift/InputMapper.java Mon Jun 11 21:13:13 2012
@@ -21,21 +21,25 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.regex.Pattern;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;
 import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 
-public class InputMapper extends Mapper<LongWritable, Text, Text, MeanShiftCanopy> {
+import com.google.common.collect.Lists;
 
+public class InputMapper extends Mapper<LongWritable,Text,Text,ClusterWritable> {
+  
   private static final Pattern SPACE = Pattern.compile(" ");
-
+  
   private int nextCanopyId;
-
+  
+  private ClusterWritable cw = new ClusterWritable();
+  
   @Override
   protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
     String[] numbers = SPACE.split(values.toString());
@@ -53,8 +57,8 @@ public class InputMapper extends Mapper<
       for (Double d : doubles) {
         point.set(index++, d);
       }
-      MeanShiftCanopy canopy = new MeanShiftCanopy(point, nextCanopyId++, new EuclideanDistanceMeasure());
-      context.write(new Text(), canopy);
+      cw.setValue(new MeanShiftCanopy(point, nextCanopyId++, new EuclideanDistanceMeasure()));
+      context.write(new Text(), cw);
     }
   }
 }