You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2012/05/10 00:02:52 UTC

svn commit: r1336424 [1/2] - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/classify/ core/src/main/java/org/apache/mahout/clustering/dirichlet/ core/src/main/java/org/apache/mahout/clustering/dirichlet/models/ core/src/main/java/org...

Author: jeastman
Date: Wed May  9 22:02:50 2012
New Revision: 1336424

URL: http://svn.apache.org/viewvc?rev=1336424&view=rev
Log:
MAHOUT-990: fixed problems with patch and all tests and displays run

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletState.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/dirichlet/TestL1ModelClustering.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Wed May  9 22:02:50 2012
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.dirichlet.DirichletCluster;
 import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
@@ -166,9 +165,7 @@ public class ClusterClassificationDriver
     while (it.hasNext()) {
       ClusterWritable next = (ClusterWritable) it.next();
       cluster = (Cluster) next.getValue();
-      if (cluster instanceof DirichletCluster) {
-        ((DirichletCluster) cluster).getModel().configure(conf);
-      }
+      cluster.configure(conf);
       clusterModels.add(cluster);
     }
     return clusterModels;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java Wed May  9 22:02:50 2012
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.dirichlet.DirichletCluster;
 import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -133,9 +132,7 @@ public class ClusterClassificationMapper
     while (it.hasNext()) {
       ClusterWritable next = (ClusterWritable) it.next();
       cluster = next.getValue();
-      if(cluster instanceof DirichletCluster){
-        ((DirichletCluster) cluster).getModel().configure(conf);
-      }
+      cluster.configure(conf);
       clusters.add(cluster);
     }
     return clusters;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java Wed May  9 22:02:50 2012
@@ -202,12 +202,14 @@ public class ClusterClassifier extends A
     }
   }
   
-  public void readFromSeqFiles(Path path) throws IOException {
+  public void readFromSeqFiles(Configuration conf, Path path) throws IOException {
     Configuration config = new Configuration();
     List<Cluster> clusters = Lists.newArrayList();
     for (ClusterWritable cw : new SequenceFileDirValueIterable<ClusterWritable>(path, PathType.LIST,
         PathFilters.logsCRCFilter(), config)) {
-      clusters.add(cw.getValue());
+      Cluster cluster = cw.getValue();
+      cluster.configure(conf);
+      clusters.add(cluster);
     }
     this.models = clusters;
     modelClass = models.get(0).getClass().getName();

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java Wed May  9 22:02:50 2012
@@ -26,37 +26,28 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.Model;
+import org.apache.mahout.clustering.ModelDistribution;
 import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
 import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.ClusterIterator;
 import org.apache.mahout.clustering.iterator.DirichletClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VectorWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.io.Closeables;
+import com.google.common.collect.Lists;
 
 public class DirichletDriver extends AbstractJob {
-
+  
   public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
   public static final String MODEL_DISTRIBUTION_KEY = "org.apache.mahout.clustering.dirichlet.modelFactory";
   public static final String NUM_CLUSTERS_KEY = "org.apache.mahout.clustering.dirichlet.numClusters";
@@ -66,13 +57,11 @@ public class DirichletDriver extends Abs
   public static final String MODEL_PROTOTYPE_CLASS_OPTION = "modelPrototype";
   public static final String MODEL_DISTRIBUTION_CLASS_OPTION = "modelDist";
   public static final String ALPHA_OPTION = "alpha";
-
-  private static final Logger log = LoggerFactory.getLogger(DirichletDriver.class);
-
+  
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new DirichletDriver(), args);
   }
-
+  
   @Override
   public int run(String[] args) throws Exception {
     addInputOption();
@@ -82,23 +71,21 @@ public class DirichletDriver extends Abs
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(ALPHA_OPTION, "a0", "The alpha0 value for the DirichletDistribution. Defaults to 1.0", "1.0");
-    addOption(MODEL_DISTRIBUTION_CLASS_OPTION,
-              "md",
-              "The ModelDistribution class name. Defaults to GaussianClusterDistribution",
-              GaussianClusterDistribution.class.getName());
-    addOption(MODEL_PROTOTYPE_CLASS_OPTION,
-              "mp",
-              "The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector",
-              RandomAccessSparseVector.class.getName());
+    addOption(MODEL_DISTRIBUTION_CLASS_OPTION, "md",
+        "The ModelDistribution class name. Defaults to GaussianClusterDistribution",
+        GaussianClusterDistribution.class.getName());
+    addOption(MODEL_PROTOTYPE_CLASS_OPTION, "mp",
+        "The ModelDistribution prototype Vector class name. Defaults to RandomAccessSparseVector",
+        RandomAccessSparseVector.class.getName());
     addOption(DefaultOptionCreator.distanceMeasureOption().withRequired(false).create());
     addOption(DefaultOptionCreator.emitMostLikelyOption().create());
     addOption(DefaultOptionCreator.thresholdOption().create());
     addOption(DefaultOptionCreator.methodOption().create());
-
+    
     if (parseArguments(args) == null) {
       return -1;
     }
-
+    
     Path input = getInputPath();
     Path output = getOutputPath();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -113,30 +100,21 @@ public class DirichletDriver extends Abs
     double threshold = Double.parseDouble(getOption(DefaultOptionCreator.THRESHOLD_OPTION));
     double alpha0 = Double.parseDouble(getOption(ALPHA_OPTION));
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    boolean runSequential =
-        getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
     int prototypeSize = readPrototypeSize(input);
-
-    DistributionDescription description =
-        new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, prototypeSize);
-
-    run(getConf(),
-        input,
-        output,
-        description,
-        numModels,
-        maxIterations,
-        alpha0,
-        runClustering,
-        emitMostLikely,
-        threshold,
-        runSequential);
+    
+    DistributionDescription description = new DistributionDescription(modelFactory, modelPrototype, distanceMeasure,
+        prototypeSize);
+    
+    run(getConf(), input, output, description, numModels, maxIterations, alpha0, runClustering, emitMostLikely,
+        threshold, runSequential);
     return 0;
   }
-
+  
   /**
-   * Iterate over the input vectors to produce clusters and, if requested, use the
-   * results of the final iteration to cluster the input vectors.
+   * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+   * cluster the input vectors.
    * 
    * @param conf
    *          the Configuration to use
@@ -144,107 +122,30 @@ public class DirichletDriver extends Abs
    *          the directory Path for input points
    * @param output
    *          the directory Path for output points
-   * @param description model distribution parameters
+   * @param description
+   *          model distribution parameters
    * @param maxIterations
    *          the maximum number of iterations
    * @param alpha0
    *          the alpha_0 value for the DirichletDistribution
-   * @param runClustering 
+   * @param runClustering
    *          true if clustering of points to be done after iterations
    * @param emitMostLikely
    *          a boolean if true emit only most likely cluster for each point
-   * @param threshold 
+   * @param threshold
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
-   * @param runSequential execute sequentially if true
+   * @param runSequential
+   *          execute sequentially if true
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         DistributionDescription description,
-                         int numModels,
-                         int maxIterations,
-                         double alpha0,
-                         boolean runClustering,
-                         boolean emitMostLikely,
-                         double threshold,
-                         boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException {
-    Path clustersOut =
-        buildClusters(conf, input, output, description, numModels, maxIterations, alpha0, runSequential);
+  public static void run(Configuration conf, Path input, Path output, DistributionDescription description,
+      int numModels, int maxIterations, double alpha0, boolean runClustering, boolean emitMostLikely, double threshold,
+      boolean runSequential) throws IOException, ClassNotFoundException, InterruptedException {
+    Path clustersOut = buildClusters(conf, input, output, description, numModels, maxIterations, alpha0, runSequential);
     if (runClustering) {
-      clusterData(conf,
-                  input,
-                  clustersOut,
-                  output,
-                  alpha0,
-                  numModels,
-                  emitMostLikely, 
-                  threshold, 
-                  runSequential);
+      clusterData(conf, input, clustersOut, output, alpha0, numModels, emitMostLikely, threshold, runSequential);
     }
   }
-
-  /**
-   * Convenience method provides default Configuration
-   * Iterate over the input vectors to produce clusters and, if requested, use the
-   * results of the final iteration to cluster the input vectors.
-   * 
-   * @param input
-   *          the directory Path for input points
-   * @param output
-   *          the directory Path for output points
-   * @param description model distribution parameters
-   * @param numClusters
-   *          the number of models to iterate over
-   * @param maxIterations
-   *          the maximum number of iterations
-   * @param alpha0
-   *          the alpha_0 value for the DirichletDistribution
-   * @param runClustering 
-   *          true if clustering of points to be done after iterations
-   * @param emitMostLikely
-   *          a boolean if true emit only most likely cluster for each point
-   * @param threshold 
-   *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
-   * @param runSequential execute sequentially if true
-   */
-  public static void run(Path input,
-                         Path output,
-                         DistributionDescription description,
-                         int numClusters,
-                         int maxIterations,
-                         double alpha0,
-                         boolean runClustering,
-                         boolean emitMostLikely,
-                         double threshold,
-                         boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException {
-    run(new Configuration(),
-        input,
-        output,
-        description,
-        numClusters,
-        maxIterations,
-        alpha0,
-        runClustering,
-        emitMostLikely,
-        threshold,
-        runSequential);
-  }
-
-  /**
-   * Creates a DirichletState object from the given arguments. Note that the modelFactory is presumed to be a
-   * subclass of VectorModelDistribution that can be initialized with a concrete Vector prototype.
-   * 
-   * @param description model distribution parameters
-   * @param numModels an int number of models to be created
-   * @param alpha0 the double alpha_0 argument to the algorithm
-   * @return an initialized DirichletState
-   */
-  static DirichletState createState(DistributionDescription description, int numModels, double alpha0) {
-    return new DirichletState(description, numModels, alpha0);
-  }
-
+  
   /**
    * Read the first input vector to determine the prototype size for the modelPrototype
    */
@@ -261,223 +162,79 @@ public class DirichletDriver extends Abs
     }
     return protoSize;
   }
-
-  /**
-   * Write initial state (prior distribution) to the output path directory
-   * @param output the output Path
-   * @param stateOut the state output Path
-   * @param description model distribution parameters
-   * @param numModels the int number of models to generate
-   * @param alpha0 the double alpha_0 argument to the DirichletDistribution
-   */
-  private static void writeInitialState(Path output,
-                                        Path stateOut,
-                                        DistributionDescription description,
-                                        int numModels,
-                                        double alpha0) throws IOException {
-
-    DirichletState state = createState(description, numModels, alpha0);
-    writeState(output, stateOut, numModels, state);
-  }
-
-  private static void writeState(Path output, Path stateOut, int numModels, DirichletState state) throws IOException {
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(output.toUri(), conf);
-    ClusterWritable clusterWritable = new ClusterWritable();
-    for (int i = 0; i < numModels; i++) {
-      Path path = new Path(stateOut, "part-" + i);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, ClusterWritable.class);
-      try {
-        DirichletCluster dirichletCluster = state.getClusters().get(i);
-        clusterWritable.setValue(dirichletCluster);
-		writer.append(new Text(Integer.toString(i)), clusterWritable);
-      } finally {
-        Closeables.closeQuietly(writer);
-      }
-    }
-  }
-
-  /**
-   * Run an iteration using supplied arguments
-   * @param conf 
-   * @param input the directory pathname for input points
-   * @param stateIn the directory pathname for input state
-   * @param stateOut the directory pathname for output state
-   * @param description model distribution parameters
-   * @param numClusters the number of clusters
-   * @param alpha0 alpha_0
-   */
-  private static void runIteration(Configuration conf,
-                                   Path input,
-                                   Path stateIn,
-                                   Path stateOut,
-                                   DistributionDescription description,
-                                   int numClusters,
-                                   double alpha0) throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(STATE_IN_KEY, stateIn.toString());
-    conf.set(MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(NUM_CLUSTERS_KEY, Integer.toString(numClusters));
-    conf.set(ALPHA_0_KEY, Double.toString(alpha0));
-
-    Job job = new Job(conf, "Dirichlet Driver running runIteration over stateIn: " + stateIn);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(ClusterWritable.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(VectorWritable.class);
-    job.setMapperClass(DirichletMapper.class);
-    job.setReducerClass(DirichletReducer.class);
-    job.setJarByClass(DirichletDriver.class);
-
-    FileInputFormat.addInputPath(job, input);
-    FileOutputFormat.setOutputPath(job, stateOut);
-
-    if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Dirichlet Iteration failed processing " + stateIn);
-    }
-  }
-
+  
   /**
    * Iterate over the input vectors to produce cluster directories for each iteration
-   * @param conf 
+   * 
+   * @param conf
+   *          the hadoop configuration
    * @param input
    *          the directory Path for input points
    * @param output
    *          the directory Path for output points
-   * @param description model distribution parameters
+   * @param description
+   *          model distribution parameters
    * @param numClusters
    *          the number of models to iterate over
    * @param maxIterations
    *          the maximum number of iterations
    * @param alpha0
    *          the alpha_0 value for the DirichletDistribution
-   * @param runSequential execute sequentially if true
+   * @param runSequential
+   *          execute sequentially if true
    * 
    * @return the Path of the final clusters directory
    */
-  public static Path buildClusters(Configuration conf,
-                                   Path input,
-                                   Path output,
-                                   DistributionDescription description,
-                                   int numClusters,
-                                   int maxIterations,
-                                   double alpha0,
-                                   boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException {
+  public static Path buildClusters(Configuration conf, Path input, Path output, DistributionDescription description,
+      int numClusters, int maxIterations, double alpha0, boolean runSequential) throws IOException,
+      ClassNotFoundException, InterruptedException {
     Path clustersIn = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
-    writeInitialState(output, clustersIn, description, numClusters, alpha0);
-
+    ModelDistribution<VectorWritable> modelDist = description.createModelDistribution(conf);
+    
+    List<Cluster> models = Lists.newArrayList();
+    for (Model<VectorWritable> cluster : modelDist.sampleFromPrior(numClusters)) {
+      models.add((Cluster) cluster);
+    }
+    
+    ClusterClassifier prior = new ClusterClassifier(models, new DirichletClusteringPolicy(numClusters, alpha0));
+    prior.writeToSeqFiles(clustersIn);
+    
     if (runSequential) {
-      clustersIn = buildClustersSeq(conf, input, output, description, numClusters, maxIterations, alpha0, clustersIn);
+      new ClusterIterator().iterateSeq(conf, input, clustersIn, output, maxIterations);
     } else {
-      clustersIn = buildClustersMR(conf, input, output, description, numClusters, maxIterations, alpha0, clustersIn);
+      new ClusterIterator().iterateMR(conf, input, clustersIn, output, maxIterations);
     }
-    return clustersIn;
+    return output;
+    
   }
-
-  private static Path buildClustersSeq(Configuration conf,
-                                       Path input,
-                                       Path output,
-                                       DistributionDescription description,
-                                       int numClusters,
-                                       int maxIterations,
-                                       double alpha0,
-                                       Path clustersIn) throws IOException {
-    int iteration = 1;
-    while (iteration <= maxIterations) {
-      log.info("Iteration {}", iteration);
-      // point the output to a new directory per iteration
-      Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
-      DirichletState state = DirichletMapper.loadState(conf,
-                                                       clustersIn.toString(),
-                                                       description,
-                                                       alpha0,
-                                                       numClusters);
-
-      List<DirichletCluster> oldModels = state.getClusters();
-      for (DirichletCluster oldModel : oldModels) {
-        oldModel.getModel().configure(conf);
-      }
-      Cluster[] newModels = (Cluster[]) state.getModelFactory().sampleFromPosterior(state.getModels());
-      for (Cluster newModel : newModels) {
-        newModel.configure(conf);
-      }
-      DirichletClusterer clusterer = new DirichletClusterer(state);
-      for (VectorWritable value
-           : new SequenceFileDirValueIterable<VectorWritable>(input,
-                                                              PathType.LIST,
-                                                              PathFilters.logsCRCFilter(),
-                                                              conf)) {
-        clusterer.observe(newModels, value);
-      }
-      clusterer.updateModels(newModels);
-      writeState(output, clustersOut, numClusters, state);
-
-      // now point the input to the old output directory
-      clustersIn = clustersOut;
-      iteration++;
-    }
-    Path fromPath = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1));
-    Path finalClustersIn = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1) + Cluster.FINAL_ITERATION_SUFFIX);
-    FileSystem.get(fromPath.toUri(), conf).rename(fromPath, finalClustersIn);
-    return finalClustersIn;
-  }
-
-  private static Path buildClustersMR(Configuration conf,
-                                      Path input,
-                                      Path output,
-                                      DistributionDescription description,
-                                      int numClusters,
-                                      int maxIterations,
-                                      double alpha0,
-                                      Path clustersIn)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    int iteration = 1;
-    while (iteration <= maxIterations) {
-      log.info("Iteration {}", iteration);
-      // point the output to a new directory per iteration
-      Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
-      runIteration(conf, input, clustersIn, clustersOut, description, numClusters, alpha0);
-      // now point the input to the old output directory
-      clustersIn = clustersOut;
-      iteration++;
-    }
-    Path fromPath = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1));
-    Path finalClustersIn = new Path(output, Cluster.CLUSTERS_DIR + (iteration-1) + Cluster.FINAL_ITERATION_SUFFIX);
-    FileSystem.get(fromPath.toUri(), conf).rename(fromPath, finalClustersIn);
-    return finalClustersIn;
-  }
-
+  
   /**
    * Run the job using supplied arguments
-   * @param conf 
- * @param input
+   * 
+   * @param conf
+   * @param input
    *          the directory pathname for input points
- * @param stateIn
+   * @param stateIn
    *          the directory pathname for input state
- * @param output
+   * @param output
    *          the directory pathname for output points
- * @param alpha0 TODO
- * @param numModels TODO
- * @param emitMostLikely
+   * @param alpha0
+   *          TODO
+   * @param numModels
+   *          TODO
+   * @param emitMostLikely
    *          a boolean if true emit only most likely cluster for each point
- * @param threshold 
+   * @param threshold
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
- * @param runSequential execute sequentially if true
+   * @param runSequential
+   *          execute sequentially if true
    */
-  public static void clusterData(Configuration conf,
-                                 Path input,
-                                 Path stateIn,
-                                 Path output,
-                                 double alpha0,
-                                 int numModels,
-                                 boolean emitMostLikely, 
-                                 double threshold, 
-                                 boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
-	  ClusterClassifier.writePolicy(new DirichletClusteringPolicy(numModels, alpha0), stateIn);
-	  ClusterClassificationDriver.run(conf, input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold, emitMostLikely, runSequential);
-    }
+  public static void clusterData(Configuration conf, Path input, Path stateIn, Path output, double alpha0,
+      int numModels, boolean emitMostLikely, double threshold, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    ClusterClassifier.writePolicy(new DirichletClusteringPolicy(numModels, alpha0), stateIn);
+    ClusterClassificationDriver.run(conf, input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold,
+        emitMostLikely, runSequential);
+  }
   
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/models/DistributionDescription.java Wed May  9 22:02:50 2012
@@ -21,64 +21,67 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Iterator;
 
-import com.google.common.base.Splitter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.mahout.clustering.ModelDistribution;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
+import com.google.common.base.Splitter;
+
 /**
  * Simply describes parameters needs to create a {@link org.apache.mahout.clustering.ModelDistribution}.
  */
 public final class DistributionDescription {
-
+  
   private final String modelFactory;
   private final String modelPrototype;
   private final String distanceMeasure;
   private final int prototypeSize;
-
-  public DistributionDescription(String modelFactory,
-                                 String modelPrototype,
-                                 String distanceMeasure,
-                                 int prototypeSize) {
+  
+  public DistributionDescription(String modelFactory, String modelPrototype, String distanceMeasure, int prototypeSize) {
     this.modelFactory = modelFactory;
     this.modelPrototype = modelPrototype;
     this.distanceMeasure = distanceMeasure;
     this.prototypeSize = prototypeSize;
   }
-
+  
   public String getModelFactory() {
     return modelFactory;
   }
-
+  
   public String getModelPrototype() {
     return modelPrototype;
   }
-
+  
   public String getDistanceMeasure() {
     return distanceMeasure;
   }
-
+  
   public int getPrototypeSize() {
     return prototypeSize;
   }
-
+  
   /**
    * Create an instance of AbstractVectorModelDistribution from the given command line arguments
+   * 
+   * @param conf
+   *          the Configuration
    */
-  public ModelDistribution<VectorWritable> createModelDistribution() {
+  public ModelDistribution<VectorWritable> createModelDistribution(Configuration conf) {
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
     AbstractVectorModelDistribution modelDistribution;
     try {
       modelDistribution = ClassUtils.instantiateAs(modelFactory, AbstractVectorModelDistribution.class);
-
+      
       Class<? extends Vector> vcl = ccl.loadClass(modelPrototype).asSubclass(Vector.class);
       Constructor<? extends Vector> v = vcl.getConstructor(int.class);
       modelDistribution.setModelPrototype(new VectorWritable(v.newInstance(prototypeSize)));
-
+      
       if (modelDistribution instanceof DistanceMeasureClusterDistribution) {
         DistanceMeasure measure = ClassUtils.instantiateAs(distanceMeasure, DistanceMeasure.class);
+        measure.configure(conf);
         ((DistanceMeasureClusterDistribution) modelDistribution).setMeasure(measure);
       }
     } catch (ClassNotFoundException cnfe) {
@@ -94,12 +97,12 @@ public final class DistributionDescripti
     }
     return modelDistribution;
   }
-
+  
   @Override
   public String toString() {
     return modelFactory + ',' + modelPrototype + ',' + distanceMeasure + ',' + prototypeSize;
   }
-
+  
   public static DistributionDescription fromString(CharSequence s) {
     Iterator<String> tokens = Splitter.on(',').split(s).iterator();
     String modelFactory = tokens.next();
@@ -108,5 +111,5 @@ public final class DistributionDescripti
     int prototypeSize = Integer.parseInt(tokens.next());
     return new DistributionDescription(modelFactory, modelPrototype, distanceMeasure, prototypeSize);
   }
-
+  
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Wed May  9 22:02:50 2012
@@ -24,13 +24,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.common.AbstractJob;
@@ -239,7 +239,7 @@ public class FuzzyKMeansDriver extends A
    * @param input
    *          the directory pathname for input points
    * @param clustersIn
-   *          the directory pathname for initial & computed clusters
+   *          the file pathname for initial cluster centers
    * @param output
    *          the directory pathname for output points
    * @param measure
@@ -274,26 +274,18 @@ public class FuzzyKMeansDriver extends A
     }
     
     if (clusters.isEmpty()) {
-      throw new IllegalStateException("Clusters is empty!");
-    }
-    
-    Path priorClustersPath = new Path(clustersIn, "clusters-0");
-    
-    FileSystem fileSystem = clustersIn.getFileSystem(conf);
-    
-    if(fileSystem.isFile(clustersIn)){
-      priorClustersPath = new Path(clustersIn.getParent(), "prior");
-      fileSystem.mkdirs(priorClustersPath);
+      throw new IllegalStateException("No input clusters found. Check your -c argument.");
     }
-    FuzzyKMeansClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(m, convergenceDelta);
     
+    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);   
+    ClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(m, convergenceDelta);
     ClusterClassifier prior = new ClusterClassifier(clusters, policy);
     prior.writeToSeqFiles(priorClustersPath);
     
     if (runSequential) {
-      new ClusterIterator().iterateSeq(input, priorClustersPath, output, maxIterations);
+      new ClusterIterator().iterateSeq(conf, input, priorClustersPath, output, maxIterations);
     } else {
-      new ClusterIterator().iterateMR(input, priorClustersPath, output, maxIterations);
+      new ClusterIterator().iterateMR(conf, input, priorClustersPath, output, maxIterations);
     }
     return output;
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java Wed May  9 22:02:50 2012
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.WritableComparable;
@@ -29,9 +30,10 @@ public class CIMapper extends Mapper<Wri
    */
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
-    String priorClustersPath = context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+    Configuration conf = context.getConfiguration();
+    String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
     classifier = new ClusterClassifier();
-    classifier.readFromSeqFiles(new Path(priorClustersPath));
+    classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
     policy = classifier.getPolicy();
     policy.update(classifier);
     super.setup(context);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java Wed May  9 22:02:50 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -62,9 +63,10 @@ public class CIReducer extends Reducer<I
    */
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
-    String priorClustersPath = context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+    Configuration conf = context.getConfiguration();
+    String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
     classifier = new ClusterClassifier();
-    classifier.readFromSeqFiles(new Path(priorClustersPath));
+    classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
     policy = classifier.getPolicy();
     policy.update(classifier);
     super.setup(context);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java Wed May  9 22:02:50 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
-import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
@@ -91,6 +90,8 @@ public class ClusterIterator {
    * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a sequential
    * implementation
    * 
+   * @param conf
+   *          the Configuration
    * @param inPath
    *          a Path to input VectorWritables
    * @param priorPath
@@ -102,11 +103,10 @@ public class ClusterIterator {
    * 
    * @throws IOException
    */
-  public void iterateSeq(Path inPath, Path priorPath, Path outPath, int numIterations) throws IOException {
+  public void iterateSeq(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
+      throws IOException {
     ClusterClassifier classifier = new ClusterClassifier();
-    classifier.readFromSeqFiles(priorPath);
-    Configuration conf = new Configuration();
-    HadoopUtil.delete(conf, outPath);
+    classifier.readFromSeqFiles(conf, priorPath);
     Path clustersOut = null;
     int iteration = 1;
     while (iteration <= numIterations) {
@@ -144,6 +144,8 @@ public class ClusterIterator {
    * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
    * implementation
    * 
+   * @param conf
+   *          the Configuration
    * @param inPath
    *          a Path to input VectorWritables
    * @param priorPath
@@ -153,10 +155,8 @@ public class ClusterIterator {
    * @param numIterations
    *          the int number of iterations to perform
    */
-  public void iterateMR(Path inPath, Path priorPath, Path outPath, int numIterations) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    Configuration conf = new Configuration();
-    HadoopUtil.delete(conf, outPath);
+  public void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
+      throws IOException, InterruptedException, ClassNotFoundException {
     ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
     Path clustersOut = null;
     int iteration = 1;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Wed May  9 22:02:50 2012
@@ -23,13 +23,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.clustering.iterator.KMeansClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
@@ -42,38 +42,41 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KMeansDriver extends AbstractJob {
-
+  
   private static final Logger log = LoggerFactory.getLogger(KMeansDriver.class);
-
+  
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new KMeansDriver(), args);
   }
-
+  
   @Override
   public int run(String[] args) throws Exception {
-
+    
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.distanceMeasureOption().create());
-    addOption(DefaultOptionCreator.clustersInOption()
-        .withDescription("The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  "
-            + "If k is also specified, then a random set of vectors will be selected"
-            + " and written out to this path first")
-        .create());
-    addOption(DefaultOptionCreator.numClustersOption()
-        .withDescription("The k in k-Means.  If specified, then a random selection of k Vectors will be chosen"
-            + " as the Centroid and written to the clusters input path.").create());
+    addOption(DefaultOptionCreator
+        .clustersInOption()
+        .withDescription(
+            "The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  "
+                + "If k is also specified, then a random set of vectors will be selected"
+                + " and written out to this path first").create());
+    addOption(DefaultOptionCreator
+        .numClustersOption()
+        .withDescription(
+            "The k in k-Means.  If specified, then a random selection of k Vectors will be chosen"
+                + " as the Centroid and written to the clusters input path.").create());
     addOption(DefaultOptionCreator.convergenceOption().create());
     addOption(DefaultOptionCreator.maxIterationsOption().create());
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(DefaultOptionCreator.methodOption().create());
     addOption(DefaultOptionCreator.outlierThresholdOption().create());
-
+    
     if (parseArguments(args) == null) {
       return -1;
     }
-
+    
     Path input = getInputPath();
     Path clusters = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
     Path output = getOutputPath();
@@ -87,10 +90,10 @@ public class KMeansDriver extends Abstra
       HadoopUtil.delete(getConf(), output);
     }
     DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
-
+    
     if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
-      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, Integer
-          .parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
+      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters,
+          Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
     }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
     boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
@@ -106,10 +109,10 @@ public class KMeansDriver extends Abstra
         clusterClassificationThreshold, runSequential);
     return 0;
   }
-
-  	/**
-   * Iterate over the input vectors to produce clusters and, if requested, use
-   * the results of the final iteration to cluster the input vectors.
+  
+  /**
+   * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+   * cluster the input vectors.
    * 
    * @param input
    *          the directory pathname for input points
@@ -126,48 +129,33 @@ public class KMeansDriver extends Abstra
    * @param runClustering
    *          true if points are to be clustered after iterations are completed
    * @param clusterClassificationThreshold
-   *          Is a clustering strictness / outlier removal parameter. Its value
-   *          should be between 0 and 1. Vectors having pdf below this value
-   *          will not be clustered.
+   *          Is a clustering strictness / outlier removal parameter. Its value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
    * @param runSequential
    *          if true execute sequential algorithm
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path clustersIn,
-                         Path output,
-                         DistanceMeasure measure,
-                         double convergenceDelta,
-                         int maxIterations,
-                         boolean runClustering,
-                         double clusterClassificationThreshold, 
-                         boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
-
+  public static void run(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure,
+      double convergenceDelta, int maxIterations, boolean runClustering, double clusterClassificationThreshold,
+      boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+    
     // iterate until the clusters converge
     String delta = Double.toString(convergenceDelta);
     if (log.isInfoEnabled()) {
-      log.info("Input: {} Clusters In: {} Out: {} Distance: {}",
-               new Object[] {input, clustersIn, output,measure.getClass().getName()});
-      log.info("convergence: {} max Iterations: {} num Reduce Tasks: {} Input Vectors: {}",
-               new Object[] {convergenceDelta, maxIterations, VectorWritable.class.getName()});
+      log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] {input, clustersIn, output,
+          measure.getClass().getName()});
+      log.info("convergence: {} max Iterations: {} num Reduce Tasks: {} Input Vectors: {}", new Object[] {
+          convergenceDelta, maxIterations, VectorWritable.class.getName()});
     }
     Path clustersOut = buildClusters(conf, input, clustersIn, output, measure, maxIterations, delta, runSequential);
     if (runClustering) {
       log.info("Clustering data");
-      clusterData(conf,
-          input,
-          clustersOut,
-          output,
-          measure,
-          clusterClassificationThreshold,
-          runSequential);
+      clusterData(conf, input, clustersOut, output, measure, clusterClassificationThreshold, runSequential);
     }
   }
-
+  
   /**
-   * Iterate over the input vectors to produce clusters and, if requested, use
-   * the results of the final iteration to cluster the input vectors.
+   * Iterate over the input vectors to produce clusters and, if requested, use the results of the final iteration to
+   * cluster the input vectors.
    * 
    * @param input
    *          the directory pathname for input points
@@ -184,37 +172,20 @@ public class KMeansDriver extends Abstra
    * @param runClustering
    *          true if points are to be clustered after iterations are completed
    * @param clusterClassificationThreshold
-   *          Is a clustering strictness / outlier removal parrameter. Its value
-   *          should be between 0 and 1. Vectors having pdf below this value
-   *          will not be clustered.
+   *          Is a clustering strictness / outlier removal parrameter. Its value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
    * @param runSequential
    *          if true execute sequential algorithm
    */
-  public static void run(Path input,
-                         Path clustersIn,
-                         Path output,
-                         DistanceMeasure measure,
-                         double convergenceDelta,
-                         int maxIterations,
-                         boolean runClustering,
-                         double clusterClassificationThreshold, 
-                         boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    run(new Configuration(),
-        input,
-        clustersIn,
-        output,
-        measure,
-        convergenceDelta,
-        maxIterations,
-        runClustering,
-        clusterClassificationThreshold, 
-        runSequential);
+  public static void run(Path input, Path clustersIn, Path output, DistanceMeasure measure, double convergenceDelta,
+      int maxIterations, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    run(new Configuration(), input, clustersIn, output, measure, convergenceDelta, maxIterations, runClustering,
+        clusterClassificationThreshold, runSequential);
   }
-
+  
   /**
-   * Iterate over the input vectors to produce cluster directories for each
-   * iteration
+   * Iterate over the input vectors to produce cluster directories for each iteration
    * 
    * @param conf
    *          the Configuration to use
@@ -235,9 +206,8 @@ public class KMeansDriver extends Abstra
    * 
    * @return the Path of the final clusters directory
    */
-  public static Path buildClusters(Configuration conf, Path input,
-      Path clustersIn, Path output, DistanceMeasure measure, int maxIterations,
-      String delta, boolean runSequential) throws IOException,
+  public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output,
+      DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException,
       InterruptedException, ClassNotFoundException {
     
     double convergenceDelta = Double.parseDouble(delta);
@@ -245,32 +215,22 @@ public class KMeansDriver extends Abstra
     KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
     
     if (clusters.isEmpty()) {
-      throw new IllegalStateException("Clusters is empty!");
-    }
-
-    Path priorClustersPath = new Path(clustersIn, "clusters-0");
-    
-    FileSystem fileSystem = clustersIn.getFileSystem(conf);
-    if(fileSystem.isFile(clustersIn)){
-      priorClustersPath = new Path(clustersIn.getParent(), "prior");
-      fileSystem.mkdirs(priorClustersPath);
+      throw new IllegalStateException("No input clusters found. Check your -c argument.");
     }
     
-    KMeansClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
-    
+    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);   
+    ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
     ClusterClassifier prior = new ClusterClassifier(clusters, policy);
     prior.writeToSeqFiles(priorClustersPath);
     
     if (runSequential) {
-      new ClusterIterator().iterateSeq(input, priorClustersPath, output,
-          maxIterations);
+      new ClusterIterator().iterateSeq(conf, input, priorClustersPath, output, maxIterations);
     } else {
-      new ClusterIterator().iterateMR(input, priorClustersPath, output,
-          maxIterations);
+      new ClusterIterator().iterateMR(conf, input, priorClustersPath, output, maxIterations);
     }
     return output;
   }
-
+  
   /**
    * Run the job using supplied arguments
    * 
@@ -283,21 +243,15 @@ public class KMeansDriver extends Abstra
    * @param measure
    *          the classname of the DistanceMeasure
    * @param clusterClassificationThreshold
-   *          Is a clustering strictness / outlier removal parrameter. Its value
-   *          should be between 0 and 1. Vectors having pdf below this value
-   *          will not be clustered.
+   *          Is a clustering strictness / outlier removal parrameter. Its value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
    * @param runSequential
    *          if true execute sequential algorithm
    */
-  public static void clusterData(Configuration conf,
-                                 Path input,
-                                 Path clustersIn,
-                                 Path output,
-                                 DistanceMeasure measure,
-                                 double clusterClassificationThreshold,
-                                 boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
-
+  public static void clusterData(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure,
+      double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    
     if (log.isInfoEnabled()) {
       log.info("Running Clustering");
       log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] {input, clustersIn, output, measure});
@@ -306,5 +260,5 @@ public class KMeansDriver extends Abstra
     ClusterClassificationDriver.run(input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY),
         clusterClassificationThreshold, true, runSequential);
   }
- 
+  
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java Wed May  9 22:02:50 2012
@@ -147,6 +147,8 @@ public class MahalanobisDistanceMeasure 
     if (v1.size() != v2.size()) {
       throw new CardinalityException(v1.size(), v2.size());
     }
+    if (inverseCovarianceMatrix== null)
+      System.out.println();
     return Math.sqrt(v1.minus(v2).dot(Algebra.mult(inverseCovarianceMatrix, v1.minus(v2))));
   }
 

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java Wed May  9 22:02:50 2012
@@ -22,11 +22,9 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.ClusteringTestUtils;
 import org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
 import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
-import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
@@ -80,74 +78,6 @@ public final class TestDirichletClusteri
     generateSamples(num, mx, my, sd, 2);
   }
 
-  private static void printResults(Iterable<Cluster[]> result, int significant) {
-    int row = 0;
-    for (Cluster[] r : result) {
-      System.out.print("sample[" + row++ + "]= ");
-      for (Cluster model : r) {
-        if (model.getNumObservations() > significant) {
-          System.out.print(model.asFormatString(null) + ", ");
-        }
-      }
-      System.out.println();
-    }
-    System.out.println();
-  }
-
-  @Test
-  public void testDirichletCluster100() {
-    System.out.println("testDirichletCluster100");
-    generateSamples(40, 1, 1, 3);
-    generateSamples(30, 1, 0, 0.1);
-    generateSamples(30, 0, 1, 0.1);
-
-    DirichletClusterer dc = new DirichletClusterer(sampleData,
-                                                   new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
-                                                   1.0,
-                                                   10,
-                                                   1,
-                                                   0);
-    List<Cluster[]> result = dc.cluster(30);
-    printResults(result, 2);
-    assertNotNull(result);
-  }
-
-  @Test
-  public void testDirichletGaussianCluster100() {
-    System.out.println("testDirichletGaussianCluster100");
-    generateSamples(40, 1, 1, 3);
-    generateSamples(30, 1, 0, 0.1);
-    generateSamples(30, 0, 1, 0.1);
-
-    DirichletClusterer dc = new DirichletClusterer(sampleData,
-                                                   new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
-                                                   1.0,
-                                                   10,
-                                                   1,
-                                                   0);
-    List<Cluster[]> result = dc.cluster(30);
-    printResults(result, 2);
-    assertNotNull(result);
-  }
-
-  @Test  
-  public void testDirichletDMCluster100() {
-    System.out.println("testDirichletDMCluster100");
-    generateSamples(40, 1, 1, 3);
-    generateSamples(30, 1, 0, 0.1);
-    generateSamples(30, 0, 1, 0.1);
-
-    DirichletClusterer dc = new DirichletClusterer(sampleData,
-                                                   new DistanceMeasureClusterDistribution(new VectorWritable(new DenseVector(2))),
-                                                   1.0,
-                                                   10,
-                                                   1,
-                                                   0);
-    List<Cluster[]> result = dc.cluster(30);
-    printResults(result, 2);
-    assertNotNull(result);
-  }
-  
   @Test
   public void testDirichletClusteringSeq() throws Exception {
     Path output = getTestTempDirPath("output");

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java Wed May  9 22:02:50 2012
@@ -18,29 +18,21 @@ package org.apache.mahout.clustering.dir
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.ClusteringTestUtils;
-import org.apache.mahout.clustering.Model;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
 import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
 import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
-import org.apache.mahout.clustering.iterator.ClusterWritable;
-import org.apache.mahout.common.DummyRecordWriter;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.common.distance.MahalanobisDistanceMeasure;
@@ -53,6 +45,9 @@ import org.apache.mahout.math.VectorWrit
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
 public final class TestMapReduce extends MahoutTestCase {
   
   private Collection<VectorWritable> sampleData = Lists.newArrayList();
@@ -89,8 +84,7 @@ public final class TestMapReduce extends
   }
   
   /**
-   * Generate random samples with asymmetric standard deviations and add them to
-   * the sampleData
+   * Generate random samples with asymmetric standard deviations and add them to the sampleData
    * 
    * @param num
    *          int number of samples to generate
@@ -118,127 +112,6 @@ public final class TestMapReduce extends
     fs = FileSystem.get(conf);
   }
   
-  /** Test the basic Mapper */
-  @Test
-  public void testMapper() throws Exception {
-    generateSamples(10, 0, 0, 1);
-    DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
-        5, 1);
-    DirichletMapper mapper = new DirichletMapper();
-    mapper.setup(state);
-    
-    RecordWriter<Text,VectorWritable> writer = new DummyRecordWriter<Text,VectorWritable>();
-    Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context context = DummyRecordWriter.build(mapper,
-        conf, writer);
-    for (VectorWritable v : sampleData) {
-      mapper.map(null, v, context);
-    }
-    // Map<String, List<VectorWritable>> data = collector.getData();
-    // this seed happens to produce two partitions, but they work
-    // assertEquals("output size", 3, data.size());
-  }
-  
-  /** Test the basic Reducer */
-  @Test
-  public void testReducer() throws Exception {
-    generateSamples(100, 0, 0, 1);
-    generateSamples(100, 2, 0, 1);
-    generateSamples(100, 0, 2, 1);
-    generateSamples(100, 2, 2, 1);
-    DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
-        20, 1);
-    DirichletMapper mapper = new DirichletMapper();
-    mapper.setup(state);
-    
-    DummyRecordWriter<Text,VectorWritable> mapWriter = new DummyRecordWriter<Text,VectorWritable>();
-    Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context mapContext = DummyRecordWriter.build(
-        mapper, conf, mapWriter);
-    for (VectorWritable v : sampleData) {
-      mapper.map(null, v, mapContext);
-    }
-    
-    DirichletReducer reducer = new DirichletReducer();
-    reducer.setup(state);
-    RecordWriter<Text,ClusterWritable> reduceWriter = new DummyRecordWriter<Text,ClusterWritable>();
-    Reducer<Text,VectorWritable,Text,ClusterWritable>.Context reduceContext = DummyRecordWriter.build(reducer, conf,
-        reduceWriter, Text.class, VectorWritable.class);
-    for (Text key : mapWriter.getKeys()) {
-      reducer.reduce(new Text(key), mapWriter.getValue(key), reduceContext);
-    }
-    
-    Cluster[] newModels = reducer.getNewModels();
-    state.update(newModels);
-  }
-  
-  /** Test the Mapper and Reducer in an iteration loop */
-  @Test
-  public void testMRIterations() throws Exception {
-    generateSamples(100, 0, 0, 1);
-    generateSamples(100, 2, 0, 1);
-    generateSamples(100, 0, 2, 1);
-    generateSamples(100, 2, 2, 1);
-    DirichletState state = new DirichletState(new GaussianClusterDistribution(new VectorWritable(new DenseVector(2))),
-        20, 1.0);
-    
-    Collection<Model<VectorWritable>[]> models = Lists.newArrayList();
-    
-    for (int iteration = 0; iteration < 10; iteration++) {
-      DirichletMapper mapper = new DirichletMapper();
-      mapper.setup(state);
-      DummyRecordWriter<Text,VectorWritable> mapWriter = new DummyRecordWriter<Text,VectorWritable>();
-      Mapper<WritableComparable<?>,VectorWritable,Text,VectorWritable>.Context mapContext = DummyRecordWriter.build(
-          mapper, conf, mapWriter);
-      for (VectorWritable v : sampleData) {
-        mapper.map(null, v, mapContext);
-      }
-      
-      DirichletReducer reducer = new DirichletReducer();
-      reducer.setup(state);
-      RecordWriter<Text,ClusterWritable> reduceWriter = new DummyRecordWriter<Text,ClusterWritable>();
-      Reducer<Text,VectorWritable,Text,ClusterWritable>.Context reduceContext = DummyRecordWriter.build(reducer, conf,
-          reduceWriter, Text.class, VectorWritable.class);
-      for (Text key : mapWriter.getKeys()) {
-        reducer.reduce(new Text(key), mapWriter.getValue(key), reduceContext);
-      }
-      
-      Cluster[] newModels = reducer.getNewModels();
-      state.update(newModels);
-      models.add(newModels);
-    }
-    printModels(models, 0);
-  }
-  
-  private static void printModels(Iterable<Model<VectorWritable>[]> results, int significant) {
-    int row = 0;
-    for (Model<VectorWritable>[] r : results) {
-      System.out.print("sample[" + row++ + "]= ");
-      for (int k = 0; k < r.length; k++) {
-        Model<VectorWritable> model = r[k];
-        if (model.getNumObservations() > significant) {
-          System.out.print("m" + k + model.toString() + ", ");
-        }
-      }
-      System.out.println();
-    }
-    System.out.println();
-  }
-  
-  private static void printResults(Iterable<List<DirichletCluster>> clusters, int significant) {
-    int row = 0;
-    for (List<DirichletCluster> r : clusters) {
-      System.out.print("sample[" + row++ + "]= ");
-      for (int k = 0; k < r.size(); k++) {
-        Model<VectorWritable> model = r.get(k).getModel();
-        if (model.getNumObservations() > significant) {
-          int total = (int) r.get(k).getTotalObservations();
-          System.out.print("m" + k + '(' + total + ')' + model.toString() + ", ");
-        }
-      }
-      System.out.println();
-    }
-    System.out.println();
-  }
-  
   /** Test the Mapper and Reducer using the Driver in sequential execution mode */
   @Test
   public void testDriverIterationsSeq() throws Exception {
@@ -252,28 +125,18 @@ public final class TestMapReduce extends
     Integer maxIterations = 5;
     DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
         DenseVector.class.getName(), null, 2);
+    Path outputPath = getTestTempDirPath("output");
     String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
-        optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
         optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
         optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
         optKey(DefaultOptionCreator.NUM_CLUSTERS_OPTION), "20", optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
         maxIterations.toString(), optKey(DirichletDriver.ALPHA_OPTION), "1.0",
         optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.CLUSTERING_OPTION),
         optKey(DefaultOptionCreator.METHOD_OPTION), DefaultOptionCreator.SEQUENTIAL_METHOD};
-    DirichletDriver dirichletDriver = new DirichletDriver();
-    dirichletDriver.setConf(conf);
-    dirichletDriver.run(args);
+    ToolRunner.run(conf, new DirichletDriver(), args);
     // and inspect results
-    Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
-    Configuration conf = new Configuration();
-    conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
-    conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
-    for (int i = 0; i <= maxIterations; i++) {
-      conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
-      clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
-    }
-    printResults(clusters, 0);
+    printModels(getClusters(outputPath, maxIterations));
   }
   
   /** Test the Mapper and Reducer using the Driver in mapreduce mode */
@@ -289,53 +152,21 @@ public final class TestMapReduce extends
     Integer maxIterations = 5;
     DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
         DenseVector.class.getName(), null, 2);
+    Path outputPath = getTestTempDirPath("output");
     String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
-        optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
         optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
         optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
         optKey(DefaultOptionCreator.NUM_CLUSTERS_OPTION), "20", optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
         maxIterations.toString(), optKey(DirichletDriver.ALPHA_OPTION), "1.0",
         optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.CLUSTERING_OPTION)};
-    ToolRunner.run(new Configuration(), new DirichletDriver(), args);
+    ToolRunner.run(conf, new DirichletDriver(), args);
     // and inspect results
-    Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
-    Configuration conf = new Configuration();
-    conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
-    conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
-    for (int i = 0; i <= maxIterations; i++) {
-      conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
-      clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
-    }
-    printResults(clusters, 0);
-  }
-  
-  /** Test the Mapper and Reducer using the Driver */
-  @Test
-  public void testDriverMnRIterations() throws Exception {
-    generate4Datasets();
-    // Now run the driver
-    int maxIterations = 3;
-    DistributionDescription description = new DistributionDescription(GaussianClusterDistribution.class.getName(),
-        DenseVector.class.getName(), null, 2);
-    Configuration conf = new Configuration();
-    DirichletDriver.run(conf, getTestTempDirPath("input"), getTestTempDirPath("output"), description, 20,
-        maxIterations, 1.0, false, true, 0, false);
-    // and inspect results
-    Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
-    conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
-    conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
-    for (int i = 0; i <= maxIterations; i++) {
-      conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
-      clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
-    }
-    printResults(clusters, 0);
+    printModels(getClusters(outputPath, maxIterations));
   }
   
   /**
-   * Test the Driver in sequential execution mode using
-   * MahalanobisDistanceMeasure
+   * Test the Driver in sequential execution mode using MahalanobisDistanceMeasure
    */
   @Test
   public void testDriverIterationsMahalanobisSeq() throws Exception {
@@ -381,8 +212,9 @@ public final class TestMapReduce extends
     conf.set("MahalanobisDistanceMeasure.vectorClass", VectorWritable.class.getName());
     
     Integer maxIterations = 5;
+    Path outputPath = getTestTempDirPath("output");
     String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
-        optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
         optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
         optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), description.getDistanceMeasure(),
         optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
@@ -394,16 +226,7 @@ public final class TestMapReduce extends
     dirichletDriver.setConf(conf);
     dirichletDriver.run(args);
     // and inspect results
-    Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
-    Configuration conf = new Configuration();
-    conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
-    conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
-    for (int i = 0; i <= maxIterations; i++) {
-      conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
-      clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
-    }
-    printResults(clusters, 0);
+    printModels(getClusters(outputPath, maxIterations));
   }
   
   /** Test the Mapper and Reducer using the Driver in mapreduce mode */
@@ -411,7 +234,7 @@ public final class TestMapReduce extends
   public void testDriverIterationsMahalanobisMR() throws Exception {
     generateAsymmetricSamples(100, 0, 0, 0.5, 3.0);
     generateAsymmetricSamples(100, 0, 3, 0.3, 4.0);
-    ClusteringTestUtils.writePointsToFile(sampleData,true, getTestTempFilePath("input/data.txt"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(sampleData, true, getTestTempFilePath("input/data.txt"), fs, conf);
     // Now run the driver using the run() method. Others can use runJob() as
     // before
     
@@ -452,8 +275,9 @@ public final class TestMapReduce extends
     conf.set("MahalanobisDistanceMeasure.vectorClass", VectorWritable.class.getName());
     
     Integer maxIterations = 5;
+    Path outputPath = getTestTempDirPath("output");
     String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), getTestTempDirPath("input").toString(),
-        optKey(DefaultOptionCreator.OUTPUT_OPTION), getTestTempDirPath("output").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
         optKey(DirichletDriver.MODEL_DISTRIBUTION_CLASS_OPTION), description.getModelFactory(),
         optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), description.getDistanceMeasure(),
         optKey(DirichletDriver.MODEL_PROTOTYPE_CLASS_OPTION), description.getModelPrototype(),
@@ -464,30 +288,37 @@ public final class TestMapReduce extends
     dirichletDriver.setConf(conf);
     ToolRunner.run(conf, dirichletDriver, args);
     // and inspect results
-    Collection<List<DirichletCluster>> clusters = Lists.newArrayList();
-    Configuration conf = new Configuration();
-    conf.set(DirichletDriver.MODEL_DISTRIBUTION_KEY, description.toString());
-    conf.set(DirichletDriver.NUM_CLUSTERS_KEY, "20");
-    conf.set(DirichletDriver.ALPHA_0_KEY, "1.0");
-    for (int i = 0; i <= maxIterations; i++) {
-      conf.set(DirichletDriver.STATE_IN_KEY, new Path(getTestTempDirPath("output"), "clusters-" + i).toString());
-      clusters.add(DirichletMapper.getDirichletState(conf).getClusters());
+    printModels(getClusters(outputPath, maxIterations));
+  }
+  
+  private void printModels(List<List<Cluster>> result) {
+    int row = 0;
+    StringBuilder models = new StringBuilder(100);
+    for (List<Cluster> r : result) {
+      models.append("sample[").append(row++).append("]= ");
+      for (int k = 0; k < r.size(); k++) {
+        Cluster model = r.get(k);
+        models.append('m').append(k).append(model.asFormatString(null)).append(", ");
+      }
+      models.append('\n');
     }
-    printResults(clusters, 0);
+    models.append('\n');
+    System.out.println(models.toString());
   }
   
-  private void generate4Datasets() throws IOException {
-    generateSamples(500, 0, 0, 0.5);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data1.txt"), fs, conf);
-    sampleData = Lists.newArrayList();
-    generateSamples(500, 2, 0, 0.2);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data2.txt"), fs, conf);
-    sampleData = Lists.newArrayList();
-    generateSamples(500, 0, 2, 0.3);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data3.txt"), fs, conf);
-    sampleData = Lists.newArrayList();
-    generateSamples(500, 2, 2, 1);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data4.txt"), fs, conf);
+  private List<List<Cluster>> getClusters(Path output, int numIterations) throws IOException {
+    List<List<Cluster>> result = new ArrayList<List<Cluster>>();
+    for (int i = 1; i <= numIterations; i++) {
+      ClusterClassifier posterior = new ClusterClassifier();
+      String name = i == numIterations ? "clusters-" + i + "-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(conf, new Path(output, name));
+      List<Cluster> clusters = Lists.newArrayList();
+      for (Cluster cluster : posterior.getModels()) {
+        clusters.add(cluster);
+      }
+      result.add(clusters);
+    }
+    return result;
   }
   
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java Wed May  9 22:02:50 2012
@@ -91,7 +91,7 @@ public final class TestClusterClassifier
     Path path = new Path(getTestTempDirPath(), "output");
     classifier.writeToSeqFiles(path);
     ClusterClassifier newClassifier = new ClusterClassifier();
-    newClassifier.readFromSeqFiles(path);
+    newClassifier.readFromSeqFiles(new Configuration(), path);
     return newClassifier;
   }
   
@@ -233,13 +233,13 @@ public final class TestClusterClassifier
     for (Cluster cluster : prior.getModels()) {
       System.out.println(cluster.asFormatString(null));
     }
-    new ClusterIterator().iterateSeq(pointsPath, path, outPath, 5);
+    new ClusterIterator().iterateSeq(conf, pointsPath, path, outPath, 5);
     
     for (int i = 1; i <= 4; i++) {
       System.out.println("Classifier-" + i);
       ClusterClassifier posterior = new ClusterClassifier();
       String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
-      posterior.readFromSeqFiles(new Path(outPath, name));
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
       assertEquals(3, posterior.getModels().size());
       for (Cluster cluster : posterior.getModels()) {
         System.out.println(cluster.asFormatString(null));
@@ -267,13 +267,13 @@ public final class TestClusterClassifier
     for (Cluster cluster : prior.getModels()) {
       System.out.println(cluster.asFormatString(null));
     }
-    new ClusterIterator().iterateMR(pointsPath, path, outPath, 5);
+    new ClusterIterator().iterateMR(conf, pointsPath, path, outPath, 5);
     
     for (int i = 1; i <= 4; i++) {
       System.out.println("Classifier-" + i);
       ClusterClassifier posterior = new ClusterClassifier();
       String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
-      posterior.readFromSeqFiles(new Path(outPath, name));
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
       assertEquals(3, posterior.getModels().size());
       for (Cluster cluster : posterior.getModels()) {
         System.out.println(cluster.asFormatString(null));

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java?rev=1336424&r1=1336423&r2=1336424&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayCanopy.java Wed May  9 22:02:50 2012
@@ -17,9 +17,14 @@
 
 package org.apache.mahout.clustering.display;
 
+import java.awt.BasicStroke;
+import java.awt.Color;
+import java.awt.Graphics;
+import java.awt.Graphics2D;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.canopy.CanopyDriver;
 import org.apache.mahout.common.HadoopUtil;
@@ -27,12 +32,6 @@ import org.apache.mahout.common.RandomUt
 import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
 import org.apache.mahout.math.DenseVector;
 
-import java.awt.BasicStroke;
-import java.awt.Color;
-import java.awt.Graphics;
-import java.awt.Graphics2D;
-import java.util.List;
-
 /**
  * Java desktop graphics class that runs canopy clustering and displays the results.
  * This class generates random data and clusters it.
@@ -79,29 +78,8 @@ public class DisplayCanopy extends Displ
     RandomUtils.useTestSeed();
     generateSamples();
     writeSampleData(samples);
-    //boolean b = true;
-    //if (b) {
     CanopyDriver.buildClusters(conf, samples, output, new ManhattanDistanceMeasure(), T1, T2, 0, true);
-    loadClusters(output, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        String pathString = path.toString();
-        return pathString.contains("/clusters-");
-      }
-    });
-    //} else {
-    //  List<Vector> points = new ArrayList<Vector>();
-    //  for (VectorWritable sample : SAMPLE_DATA) {
-    //    points.add(sample.get());
-    //  }
-    //  List<Canopy> canopies = CanopyClusterer.createCanopies(points, new ManhattanDistanceMeasure(), T1, T2);
-    //  CanopyClusterer.updateCentroids(canopies);
-    //  List<Cluster> clusters = new ArrayList<Cluster>();
-    //  for (Canopy canopy : canopies) {
-    //    clusters.add(canopy);
-    //  }
-    //  CLUSTERS.add(clusters);
-    //}
+    loadClustersWritable(output);
 
     new DisplayCanopy();
   }