You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/16 18:10:30 UTC

svn commit: r1301654 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/classify/ core/src/main/java/org/apache/mahout/clustering/dirichlet/ core/src/main/java/org/apache/mahout/clustering/iterator/ core/src/main/java/org/apache/mahout...

Author: pranjan
Date: Fri Mar 16 17:10:29 2012
New Revision: 1301654

URL: http://svn.apache.org/viewvc?rev=1301654&view=rev
Log:
MAHOUT-981, MAHOUT-983. Refactored K-Means Clustering and Dirichlet Clustering to use ClusterClassificationDriver. 
Using cluster.getModel().configure() in ClusterClassificationDriver in order to configure DirichletCluster for MahalanobisDistanceMeasure. 
Added/fixed test cases by:
Using separate directories in test cases for supplying initial clusters and to store buildClusters to prevent two cluster-*-final files in the same directory.
Writing IntWritable in test cases instead of LongWritable ( As the ClusterClassificationDriver clusters records with IntWritable keys). 

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Fri Mar 16 17:10:29 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.dirichlet.DirichletCluster;
 import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -131,25 +132,21 @@ public class ClusterClassificationDriver
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static void run(Path input, Path clusteringOutputPath, Path output,
-      Double clusterClassificationThreshold, boolean emitMostLikely,
-      boolean runSequential) throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public static void run(Path input, Path clusteringOutputPath, Path output, Double clusterClassificationThreshold,
+      boolean emitMostLikely, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
     if (runSequential) {
-      classifyClusterSeq(input, clusteringOutputPath, output,
-          clusterClassificationThreshold, emitMostLikely);
+      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
     } else {
-      Configuration conf = new Configuration();
-      classifyClusterMR(conf, input, clusteringOutputPath, output,
-          clusterClassificationThreshold, emitMostLikely);
+      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
     }
     
   }
   
-  private static void classifyClusterSeq(Path input, Path clusters,
-      Path output, Double clusterClassificationThreshold, boolean emitMostLikely)
+  private static void classifyClusterSeq(Configuration conf, Path input,
+      Path clusters, Path output, Double clusterClassificationThreshold, boolean emitMostLikely)
       throws IOException {
-    List<Cluster> clusterModels = populateClusterModels(clusters);
+    List<Cluster> clusterModels = populateClusterModels(clusters, conf);
     ClusteringPolicy policy = ClusterClassifier
         .readPolicy(finalClustersPath(clusters));
     ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels,
@@ -164,19 +161,24 @@ public class ClusterClassificationDriver
    * 
    * @param clusterOutputPath
    *          The output path of the clustering.
+   * @param conf
+   *          The Hadoop Configuration
    * @return The list of clusters found by the clustering.
    * @throws IOException
    */
-  private static List<Cluster> populateClusterModels(Path clusterOutputPath)
+  private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf)
       throws IOException {
     List<Cluster> clusterModels = new ArrayList<Cluster>();
     Cluster cluster = null;
     Path finalClustersPath = finalClustersPath(clusterOutputPath);
     Iterator<?> it = new SequenceFileDirValueIterator<Writable>(
         finalClustersPath, PathType.LIST, PathFilters.partFilter(), null,
-        false, new Configuration());
+        false, conf);
     while (it.hasNext()) {
       cluster = (Cluster) it.next();
+      if(cluster instanceof DirichletCluster){
+    	  ((DirichletCluster) cluster).getModel().configure(conf);
+      }
       clusterModels.add(cluster);
     }
     return clusterModels;
@@ -306,5 +308,16 @@ public class ClusterClassificationDriver
           "Cluster Classification Driver Job failed processing " + input);
     }
   }
+
+  public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
+      double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    if (runSequential) {
+      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    } else {
+      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    }
+    
+  }
   
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java Fri Mar 16 17:10:29 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.dirichlet.DirichletCluster;
 import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
@@ -47,7 +48,7 @@ import org.apache.mahout.math.VectorWrit
  * Mapper for classifying vectors into clusters.
  */
 public class ClusterClassificationMapper extends
-    Mapper<LongWritable,VectorWritable,IntWritable,WeightedVectorWritable> {
+    Mapper<IntWritable,VectorWritable,IntWritable,WeightedVectorWritable> {
   
   private static double threshold;
   private List<Cluster> clusterModels;
@@ -70,7 +71,7 @@ public class ClusterClassificationMapper
     
     if (clustersIn != null && !clustersIn.isEmpty()) {
       Path clustersInPath = new Path(clustersIn);
-      clusterModels = populateClusterModels(clustersInPath);
+      clusterModels = populateClusterModels(clustersInPath, conf);
       ClusteringPolicy policy = ClusterClassifier
           .readPolicy(finalClustersPath(clustersInPath));
       clusterClassifier = new ClusterClassifier(clusterModels, policy);
@@ -83,7 +84,7 @@ public class ClusterClassificationMapper
    * Mapper which classifies the vectors to respective clusters.
    */
   @Override
-  protected void map(LongWritable key, VectorWritable vw, Context context)
+  protected void map(IntWritable key, VectorWritable vw, Context context)
       throws IOException, InterruptedException {
     if (!clusterModels.isEmpty()) {
       Vector pdfPerCluster = clusterClassifier.classify(vw.get());
@@ -118,10 +119,9 @@ public class ClusterClassificationMapper
     context.write(clusterId, weightedVW);
   }
   
-  public static List<Cluster> populateClusterModels(Path clusterOutputPath)
+  public static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf)
       throws IOException {
     List<Cluster> clusters = new ArrayList<Cluster>();
-    Configuration conf = new Configuration();
     Cluster cluster = null;
     FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
     FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
@@ -131,6 +131,9 @@ public class ClusterClassificationMapper
         null, false, conf);
     while (it.hasNext()) {
       cluster = (Cluster) it.next();
+      if(cluster instanceof DirichletCluster){
+        ((DirichletCluster) cluster).getModel().configure(conf);
+      }
       clusters.add(cluster);
     }
     return clusters;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java Fri Mar 16 17:10:29 2012
@@ -17,22 +17,17 @@
 
 package org.apache.mahout.clustering.dirichlet;
 
-import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.Model;
 import org.apache.mahout.clustering.ModelDistribution;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
+import com.google.common.collect.Lists;
+
 /**
  * Performs Bayesian mixture modeling.
  * <p/>
@@ -105,10 +100,6 @@ public class DirichletClusterer {
 
   private final List<Cluster[]> clusterSamples = Lists.newArrayList();
 
-  private boolean emitMostLikely;
-
-  private double threshold;
-
   /**
    * Create a new instance on the sample data with the given additional parameters
    * 
@@ -171,18 +162,14 @@ public class DirichletClusterer {
 
   /**
    * This constructor only used by DirichletClusterMapper for setting up clustering params
-   * @param emitMostLikely
-   * @param threshold
    */
-  public DirichletClusterer(boolean emitMostLikely, double threshold) {
+  public DirichletClusterer() {
     this.sampleData = null;
     this.modelFactory = null;
     this.thin = 0;
     this.burnin = 0;
     this.numClusters = 0;
     this.state = null;
-    this.emitMostLikely = emitMostLikely;
-    this.threshold = threshold;
   }
 
   /**
@@ -275,139 +262,4 @@ public class DirichletClusterer {
     return cluster;
   }
 
-  /**
-   * Emit the point to one or more clusters depending upon clusterer state
-   * 
-   * @param vector a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param context a Mapper.Context to emit to
-   */
-  public void emitPointToClusters(VectorWritable vector,
-                                  List<DirichletCluster> clusters,
-                                  Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    Vector pi = new DenseVector(clusters.size());
-    for (int i = 0; i < clusters.size(); i++) {
-      pi.set(i, clusters.get(i).getModel().pdf(vector));
-    }
-    pi = pi.divide(pi.zSum());
-    if (emitMostLikely) {
-      emitMostLikelyCluster(vector, clusters, pi, context);
-    } else {
-      emitAllClusters(vector, clusters, pi, context);
-    }
-  }
-
-  /**
-   * Emit the point to the most likely cluster
-   * 
-   * @param point a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param pi the normalized pdf Vector for the point
-   * @param context a Mapper.Context to emit to
-   */
-  private void emitMostLikelyCluster(VectorWritable point,
-                                     Collection<DirichletCluster> clusters,
-                                     Vector pi,
-                                     Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    int clusterId = -1;
-    double clusterPdf = 0;
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = pi.get(i);
-      if (pdf > clusterPdf) {
-        clusterId = i;
-        clusterPdf = pdf;
-      }
-    }
-    //System.out.println(clusterId + ": " + ClusterBase.formatVector(vector.get(), null));
-    context.write(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, point.get()));
-  }
-
-  /**
-   * Emit the point to all clusters if pdf exceeds the threshold
-   * @param point a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param pi the normalized pdf Vector for the point
-   * @param context a Mapper.Context to emit to
-   */
-  private void emitAllClusters(VectorWritable point,
-                               List<DirichletCluster> clusters,
-                               Vector pi,
-                               Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = pi.get(i);
-      if (pdf > threshold && clusters.get(i).getTotalObservations() > 0) {
-        //System.out.println(i + ": " + ClusterBase.formatVector(vector.get(), null));
-        context.write(new IntWritable(i), new WeightedVectorWritable(pdf, point.get()));
-      }
-    }
-  }
-
-  /**
-   * Emit the point to one or more clusters depending upon clusterer state
-   * 
-   * @param vector a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param writer a SequenceFile.Writer to emit to
-   */
-  public void emitPointToClusters(VectorWritable vector, List<DirichletCluster> clusters, Writer writer)
-    throws IOException {
-    Vector pi = new DenseVector(clusters.size());
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = clusters.get(i).getModel().pdf(vector);
-      pi.set(i, pdf);
-    }
-    pi = pi.divide(pi.zSum());
-    if (emitMostLikely) {
-      emitMostLikelyCluster(vector, clusters, pi, writer);
-    } else {
-      emitAllClusters(vector, clusters, pi, writer);
-    }
-  }
-
-  /**
-   * Emit the point to all clusters if pdf exceeds the threshold
-   * 
-   * @param vector a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param pi the normalized pdf Vector for the point
-   * @param writer a SequenceFile.Writer to emit to
-   */
-  private void emitAllClusters(VectorWritable vector, List<DirichletCluster> clusters, Vector pi, Writer writer)
-    throws IOException {
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = pi.get(i);
-      if (pdf > threshold && clusters.get(i).getTotalObservations() > 0) {
-        //System.out.println(i + ": " + ClusterBase.formatVector(vector.get(), null));
-        writer.append(new IntWritable(i), new WeightedVectorWritable(pdf, vector.get()));
-      }
-    }
-  }
-
-  /**
-   * Emit the point to the most likely cluster
-   * 
-   * @param vector a VectorWritable holding the Vector
-   * @param clusters a List of DirichletClusters
-   * @param pi the normalized pdf Vector for the point
-   * @param writer a SequenceFile.Writer to emit to
-   */
-  private static void emitMostLikelyCluster(VectorWritable vector,
-                                            Collection<DirichletCluster> clusters,
-                                            Vector pi,
-                                            Writer writer) throws IOException {
-    double maxPdf = 0;
-    int clusterId = -1;
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = pi.get(i);
-      if (pdf > maxPdf) {
-        maxPdf = pdf;
-        clusterId = i;
-      }
-    }
-    //System.out.println(i + ": " + ClusterBase.formatVector(vector.get(), null));
-    writer.append(new IntWritable(clusterId), new WeightedVectorWritable(maxPdf, vector.get()));
-  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java Fri Mar 16 17:10:29 2012
@@ -17,15 +17,15 @@
 
 package org.apache.mahout.clustering.dirichlet;
 
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
 import java.io.IOException;
 import java.util.List;
 
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -35,9 +35,11 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
 import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
+import org.apache.mahout.clustering.iterator.DirichletClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -50,6 +52,8 @@ import org.apache.mahout.math.VectorWrit
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.io.Closeables;
+
 public class DirichletDriver extends AbstractJob {
 
   public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
@@ -170,9 +174,11 @@ public class DirichletDriver extends Abs
       clusterData(conf,
                   input,
                   clustersOut,
-                  new Path(output, Cluster.CLUSTERED_POINTS_DIR),
-                  emitMostLikely,
-                  threshold,
+                  output,
+                  alpha0,
+                  numModels,
+                  emitMostLikely, 
+                  threshold, 
                   runSequential);
     }
   }
@@ -442,92 +448,32 @@ public class DirichletDriver extends Abs
   /**
    * Run the job using supplied arguments
    * @param conf 
-   * 
-   * @param input
+ * @param input
    *          the directory pathname for input points
-   * @param stateIn
+ * @param stateIn
    *          the directory pathname for input state
-   * @param output
+ * @param output
    *          the directory pathname for output points
-   * @param emitMostLikely
+ * @param alpha0 TODO
+ * @param numModels TODO
+ * @param emitMostLikely
    *          a boolean if true emit only most likely cluster for each point
-   * @param threshold 
+ * @param threshold 
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
-   * @param runSequential execute sequentially if true
+ * @param runSequential execute sequentially if true
    */
   public static void clusterData(Configuration conf,
                                  Path input,
                                  Path stateIn,
                                  Path output,
-                                 boolean emitMostLikely,
-                                 double threshold,
+                                 double alpha0,
+                                 int numModels,
+                                 boolean emitMostLikely, 
+                                 double threshold, 
                                  boolean runSequential)
     throws IOException, InterruptedException, ClassNotFoundException {
-    if (runSequential) {
-      clusterDataSeq(conf, input, stateIn, output, emitMostLikely, threshold);
-    } else {
-      clusterDataMR(conf, input, stateIn, output, emitMostLikely, threshold);
+	  ClusterClassifier.writePolicy(new DirichletClusteringPolicy(numModels, alpha0), stateIn);
+	  ClusterClassificationDriver.run(conf, input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), threshold, emitMostLikely, runSequential);
     }
-  }
-
-  private static void clusterDataSeq(Configuration conf,
-                                     Path input,
-                                     Path stateIn,
-                                     Path output,
-                                     boolean emitMostLikely,
-                                     double threshold) throws IOException {
-
-    List<DirichletCluster> clusters = DirichletClusterMapper.loadClusters(conf, stateIn);
-
-    for (DirichletCluster cluster : clusters) {
-      cluster.getModel().configure(conf);
-    }
-    
-    DirichletClusterer clusterer = new DirichletClusterer(emitMostLikely, threshold);
-    // iterate over all points, assigning each to the closest canopy and outputing that clustering
-    FileSystem fs = FileSystem.get(input.toUri(), conf);
-    FileStatus[] status = fs.listStatus(input, PathFilters.logsCRCFilter());
-    int part = 0;
-    for (FileStatus s : status) {
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
-                                                           conf,
-                                                           new Path(output, "part-m-" + part++),
-                                                           IntWritable.class,
-                                                           WeightedVectorWritable.class);
-      try {
-        for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(s.getPath(), conf)) {
-          clusterer.emitPointToClusters(value, clusters, writer);
-        }
-      } finally {
-        Closeables.closeQuietly(writer);
-      }
-    }
-
-  }
-
-  private static void clusterDataMR(Configuration conf,
-                                    Path input,
-                                    Path stateIn,
-                                    Path output,
-                                    boolean emitMostLikely,
-                                    double threshold) throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(STATE_IN_KEY, stateIn.toString());
-    conf.set(EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
-    conf.set(THRESHOLD_KEY, Double.toString(threshold));
-    Job job = new Job(conf, "Dirichlet Driver running clusterData over input: " + input);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(WeightedVectorWritable.class);
-    job.setMapperClass(DirichletClusterMapper.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setJarByClass(DirichletDriver.class);
-
-    FileInputFormat.addInputPath(job, input);
-    FileOutputFormat.setOutputPath(job, output);
-
-    if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Dirichlet Clustering failed processing " + stateIn);
-    }
-  }
+  
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java Fri Mar 16 17:10:29 2012
@@ -33,6 +33,13 @@ public class DirichletClusteringPolicy e
     super();
   }
   
+  /**
+   * 
+   * @param k
+   *          The number of models to create from prior
+   * @param alpha0
+   *          The alpha_0 parameter to the Dirichlet Distribution.
+   */
   public DirichletClusteringPolicy(int k, double alpha0) {
     this.alpha0 = alpha0;
     this.mixture = UncommonDistributions.rDirichlet(new DenseVector(k), alpha0);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java Fri Mar 16 17:10:29 2012
@@ -17,24 +17,18 @@
 package org.apache.mahout.clustering.kmeans;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.clustering.AbstractCluster;
 import org.apache.mahout.clustering.ClusterObservations;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.math.Vector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * This class implements the k-means clustering algorithm. It uses {@link Kluster} as a cluster
  * representation. The class can be used as part of a clustering job to be started as map/reduce job.
@@ -118,52 +112,6 @@ public class KMeansClusterer {
     return converged;
   }
 
-  public void outputPointWithClusterInfo(Vector vector,
-                                         Iterable<Kluster> clusters,
-                                         Mapper<?,?,IntWritable,WeightedPropertyVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    AbstractCluster nearestCluster = null;
-    double nearestDistance = Double.MAX_VALUE;
-    for (AbstractCluster cluster : clusters) {
-      Vector clusterCenter = cluster.getCenter();
-      double distance = measure.distance(clusterCenter.getLengthSquared(), clusterCenter, vector);
-      if (distance < nearestDistance || nearestCluster == null) {
-        nearestCluster = cluster;
-        nearestDistance = distance;
-      }
-    }
-    Map<Text, Text> props = new HashMap<Text, Text>();
-    props.put(new Text("distance"), new Text(String.valueOf(nearestDistance)));
-    context.write(new IntWritable(nearestCluster.getId()), new WeightedPropertyVectorWritable(1, vector, props));
-  }
-
-  /**
-   * Iterates over all clusters and identifies the one closes to the given point. Distance measure used is
-   * configured at creation time.
-   * 
-   * @param point
-   *          a point to find a cluster for.
-   * @param clusters
-   *          a List<Cluster> to test.
-   */
-  protected void emitPointToNearestCluster(Vector point, Iterable<Kluster> clusters, Writer writer)
-    throws IOException {
-    AbstractCluster nearestCluster = null;
-    double nearestDistance = Double.MAX_VALUE;
-    for (AbstractCluster cluster : clusters) {
-      Vector clusterCenter = cluster.getCenter();
-      double distance = this.measure.distance(clusterCenter.getLengthSquared(), clusterCenter, point);
-      if (log.isDebugEnabled()) {
-        log.debug("{} Cluster: {}", distance, cluster.getId());
-      }
-      if (distance < nearestDistance || nearestCluster == null) {
-        nearestCluster = cluster;
-        nearestDistance = distance;
-      }
-    }
-    writer.append(new IntWritable(nearestCluster.getId()), new WeightedVectorWritable(1, point));
-  }
-
   /**
    * This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
    * until their centers converge or until the maximum number of iterations is exceeded.

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Fri Mar 16 17:10:29 2012
@@ -16,16 +16,15 @@
  */
 package org.apache.mahout.clustering.kmeans;
 
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
 import java.io.IOException;
 import java.util.Collection;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -36,8 +35,9 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.AbstractCluster;
 import org.apache.mahout.clustering.ClusterObservations;
-import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.KMeansClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
@@ -47,12 +47,14 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
 import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
 public class KMeansDriver extends AbstractJob {
 
   private static final Logger log = LoggerFactory.getLogger(KMeansDriver.class);
@@ -157,7 +159,7 @@ public class KMeansDriver extends Abstra
       clusterData(conf,
           input,
           clustersOut,
-          new Path(output, AbstractCluster.CLUSTERED_POINTS_DIR),
+          output,
           measure,
           delta,
           runSequential);
@@ -428,73 +430,10 @@ public class KMeansDriver extends Abstra
       log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] {input, clustersIn, output, measure});
       log.info("convergence: {} Input Vectors: {}", convergenceDelta, VectorWritable.class.getName());
     }
-    if (runSequential) {
-      clusterDataSeq(conf, input, clustersIn, output, measure);
-    } else {
-      clusterDataMR(conf, input, clustersIn, output, measure, convergenceDelta);
-    }
-  }
-
-  private static void clusterDataSeq(Configuration conf,
-                                     Path input,
-                                     Path clustersIn,
-                                     Path output,
-                                     DistanceMeasure measure) throws IOException {
-
-    KMeansClusterer clusterer = new KMeansClusterer(measure);
-    Collection<Kluster> clusters = Lists.newArrayList();
-    KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
-    if (clusters.isEmpty()) {
-      throw new IllegalStateException("Clusters is empty!");
-    }
-    FileSystem fs = FileSystem.get(input.toUri(), conf);
-    FileStatus[] status = fs.listStatus(input, PathFilters.logsCRCFilter());
-    int part = 0;
-    for (FileStatus s : status) {
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
-                                                           conf,
-                                                           new Path(output, "part-m-" + part),
-                                                           IntWritable.class,
-                                                           WeightedVectorWritable.class);
-      try {
-        for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(s.getPath(), conf)) {
-          clusterer.emitPointToNearestCluster(value.get(), clusters, writer);
-        }
-      } finally {
-        Closeables.closeQuietly(writer);
-      }
-    }
-
-  }
-
-  private static void clusterDataMR(Configuration conf,
-                                    Path input,
-                                    Path clustersIn,
-                                    Path output,
-                                    DistanceMeasure measure,
-                                    String convergenceDelta)
-    throws IOException, InterruptedException, ClassNotFoundException {
-
-    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
-    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
-    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
-
-    Job job = new Job(conf, "KMeans Driver running clusterData over input: " + input);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(WeightedPropertyVectorWritable.class);
-
-    FileInputFormat.setInputPaths(job, input);
-    HadoopUtil.delete(conf, output);
-    FileOutputFormat.setOutputPath(job, output);
-
-    job.setMapperClass(KMeansClusterMapper.class);
-    job.setNumReduceTasks(0);
-    job.setJarByClass(KMeansDriver.class);
-
-    if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("K-Means Clustering failed processing " + clustersIn);
-    }
+    Double clusterClassificationThreshold = 0.0;
+    ClusterClassifier.writePolicy(new KMeansClusteringPolicy(), clustersIn);
+    ClusterClassificationDriver.run(input, output, new Path(output, CLUSTERED_POINTS_DIRECTORY),
+        clusterClassificationThreshold, true, runSequential);
   }
+ 
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Fri Mar 16 17:10:29 2012
@@ -517,9 +517,9 @@ public final class TestCanopyCreation ex
   public void testClusteringManhattanMR() throws Exception {
     List<VectorWritable> points = getPointsWritable();
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file2"), fs, conf);
     // now run the Job
     Path output = getTestTempDirPath("output");
@@ -538,9 +538,9 @@ public final class TestCanopyCreation ex
   public void testClusteringEuclideanMR() throws Exception {
     List<VectorWritable> points = getPointsWritable();
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file2"), fs, conf);
     // now run the Job using the run() command. Others can use runJob().
     Path output = getTestTempDirPath("output");
@@ -567,9 +567,9 @@ public final class TestCanopyCreation ex
   public void testClusteringEuclideanWithOutlierRemovalMR() throws Exception {
     List<VectorWritable> points = getPointsWritable();
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         getTestTempFilePath("testdata/file2"), fs, conf);
     // now run the Job using the run() command. Others can use runJob().
     Path output = getTestTempDirPath("output");

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java Fri Mar 16 17:10:29 2012
@@ -104,7 +104,7 @@ public class ClusterClassificationDriver
     
     conf = new Configuration();
     
-    ClusteringTestUtils.writePointsToFile(points,
+    ClusteringTestUtils.writePointsToFile(points, true, 
         new Path(pointsPath, "file1"), fs, conf);
     runClustering(pointsPath, conf, false);
     runClassificationWithOutlierRemoval(conf, false);

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java Fri Mar 16 17:10:29 2012
@@ -19,20 +19,29 @@ package org.apache.mahout.clustering.dir
 
 import java.util.List;
 
-import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
 import org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
+import org.apache.mahout.clustering.dirichlet.models.DistributionDescription;
 import org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
 import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VectorWritable;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public final class TestDirichletClustering extends MahoutTestCase {
 
   private List<VectorWritable> sampleData;
-
+  
   @Override
   @Before
   public void setUp() throws Exception {
@@ -138,5 +147,60 @@ public final class TestDirichletClusteri
     printResults(result, 2);
     assertNotNull(result);
   }
+  
+  @Test
+  public void testDirichletClusteringSeq() throws Exception {
+    Path output = getTestTempDirPath("output");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(new Configuration());
+    
+    generateSamples(40, 1, 1, 3);
+    generateSamples(30, 1, 0, 0.1);
+    generateSamples(30, 0, 1, 0.1);
+
+    ClusteringTestUtils.writePointsToFile(sampleData,
+            getTestTempFilePath("testdata/file1"), fs, conf);
+
+    DenseVector prototype = (DenseVector) sampleData.get(0).get();
+    
+    DistributionDescription description = new DistributionDescription(
+        DistanceMeasureClusterDistribution.class.getName(),
+        RandomAccessSparseVector.class.getName(),
+        ManhattanDistanceMeasure.class.getName(), prototype.size());
+    
+    DirichletDriver.run(conf, getTestTempDirPath("testdata"), output,
+        description, 10, 1, 1.0, true, true, 0, true);
+    
+    Path path = new Path(output, "clusteredPoints/part-m-0");
+    long count = HadoopUtil.countRecords(path, conf);
+    assertEquals("number of points", sampleData.size(), count);
+  }
+  
+  @Test
+  public void testDirichletClusteringMR() throws Exception {
+    Path output = getTestTempDirPath("output");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(new Configuration());
+    
+    generateSamples(40, 1, 1, 3);
+    generateSamples(30, 1, 0, 0.1);
 
+    ClusteringTestUtils.writePointsToFile(sampleData, true,
+            getTestTempFilePath("testdata/file1"), fs, conf);
+
+    DenseVector prototype = (DenseVector) sampleData.get(0).get();
+    
+    DistributionDescription description = new DistributionDescription(
+        DistanceMeasureClusterDistribution.class.getName(),
+        RandomAccessSparseVector.class.getName(),
+        ManhattanDistanceMeasure.class.getName(), prototype.size());
+    
+    DirichletDriver.run(conf, getTestTempDirPath("testdata"), output,
+        description, 10, 1, 1.0, true, true, 0, false);
+    
+    Path path = new Path(output, "clusteredPoints/part-m-00000");
+    long count = HadoopUtil.countRecords(path, conf);
+    assertEquals("number of points", sampleData.size(), count);
+  }
+  
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java Fri Mar 16 17:10:29 2012
@@ -282,7 +282,7 @@ public final class TestMapReduce extends
     generateSamples(100, 2, 0, 0.2);
     generateSamples(100, 0, 2, 0.3);
     generateSamples(100, 2, 2, 1);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data.txt"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(sampleData, true, getTestTempFilePath("input/data.txt"), fs, conf);
     // Now run the driver using the run() method. Others can use runJob() as
     // before
     Integer maxIterations = 5;
@@ -410,7 +410,7 @@ public final class TestMapReduce extends
   public void testDriverIterationsMahalanobisMR() throws Exception {
     generateAsymmetricSamples(100, 0, 0, 0.5, 3.0);
     generateAsymmetricSamples(100, 0, 3, 0.3, 4.0);
-    ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("input/data.txt"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(sampleData,true, getTestTempFilePath("input/data.txt"), fs, conf);
     // Now run the driver using the run() method. Others can use runJob() as
     // before
     

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Fri Mar 16 17:10:29 2012
@@ -367,8 +367,8 @@ public final class TestKmeansClustering 
     Path pointsPath = getTestTempDirPath("points");
     Path clustersPath = getTestTempDirPath("clusters");
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
     for (int k = 1; k < points.size(); k++) {
       System.out.println("testKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
@@ -423,8 +423,8 @@ public final class TestKmeansClustering 
     Path pointsPath = getTestTempDirPath("points");
     Path clustersPath = getTestTempDirPath("clusters");
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
     for (int k = 1; k < points.size(); k += 3) {
       System.out.println("testKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
@@ -481,8 +481,8 @@ public final class TestKmeansClustering 
     
     Path pointsPath = getTestTempDirPath("points");
     Configuration conf = new Configuration();
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
-    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
     
     Path outputPath = getTestTempDirPath("output");
     // now run the Canopy job

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java Fri Mar 16 17:10:29 2012
@@ -342,12 +342,13 @@ public final class TestClusterEvaluator 
     Configuration conf = new Configuration();
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, true);
     // now run the KMeans job
-    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output, measure,
+    Path kmeansOutput = new Path(output, "kmeans");
+	KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), kmeansOutput, measure,
         0.001, 10, true, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-2");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
-        "clusteredPoints"), output, measure, numIterations, true);
+        "clusteredPoints"), kmeansOutput, measure, numIterations, true);
     ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
     // now print out the Results
     System.out.println("Intra-cluster density = "

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java?rev=1301654&r1=1301653&r2=1301654&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java Fri Mar 16 17:10:29 2012
@@ -352,12 +352,13 @@ public final class TestCDbwEvaluator ext
     CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
         false, 0.0, true);
     // now run the KMeans job
-    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output, measure,
+    Path kmeansOutput = new Path(output, "kmeans");
+	KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), kmeansOutput, measure,
         0.001, 10, true, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-2");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
-        "clusteredPoints"), output, measure, numIterations, true);
+        "clusteredPoints"), kmeansOutput, measure, numIterations, true);
     CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
     // printRepPoints(numIterations);
     // now print out the Results