You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/04/30 22:16:57 UTC

svn commit: r939800 [1/2] - in /lucene/mahout/trunk: core/ core/src/main/java/org/apache/mahout/clustering/ core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/dirichlet/ core/src/main/java/org/apache...

Author: jeastman
Date: Fri Apr 30 20:16:56 2010
New Revision: 939800

URL: http://svn.apache.org/viewvc?rev=939800&view=rev
Log:
MAHOUT-236:
- moved default file directory definitions to Cluster from ClusterBase
- refactored CanopyClusteringJob and ClusterDriver into CanopyDriver & removed both
- added runClustering option to CanopyDriver.runJob
- refactored DirichletJob into DirichletDriver
- added runClustering option to DirichletDriver.runJob
- added emitMostLikely and threshold options to DirichletDriver.runJob
- implemented emitMostLikelyCluster and emitAllClusters (above threshold)
- refactored FuzzyKMeansJob into FuzzyKMeanstDriver
- added runClustering option to FuzzyKMeanstDriver.runJob
- added emitMostLikely and threshold options to FuzzyKMeanstDriver.runJob
- implemented emitMostLikelyCluster and emitAllClusters (above threshold)
- enhanced TestClusterDumper to utilize term dictionary and enhanced printouts
- updated other unit tests, all of which run

Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
Modified:
    lucene/mahout/trunk/core/   (props changed)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
    lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
    lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java

Propchange: lucene/mahout/trunk/core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Apr 30 20:16:56 2010
@@ -12,3 +12,4 @@ target
 .checkstyle
 .pmd
 .ruleset
+testdatatestdata

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java Fri Apr 30 20:16:56 2010
@@ -24,6 +24,15 @@ import org.apache.mahout.math.Vector;
  */
 public interface Cluster {
   
+  // default directory for all clustered points
+  String CLUSTERED_POINTS_DIR = "/clusteredPoints";
+
+  // default directory for initial clusters to prime iterative clustering algorithms
+  String INITIAL_CLUSTERS_DIR = "/clusters-0";
+
+  // default directory for output of clusters per iteration
+  String CLUSTERS_DIR = "/clusters-";
+
   /**
    * Get the id of the Cluster
    * 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java Fri Apr 30 20:16:56 2010
@@ -41,15 +41,6 @@ import com.google.gson.reflect.TypeToken
  */
 public abstract class ClusterBase implements Writable, Cluster {
 
-  // default directory for all clustered points
-  public static final String CLUSTERED_POINTS_DIR = "/clusteredPoints";
-
-  // default directory for initial clusters to prime iterative clustering algorithms
-  public static final String INITIAL_CLUSTERS_DIR = "/clusters-0";
-
-  // default directory for output of clusters per iteration
-  public static final String CLUSTERS_DIR = "/clusters-";
-
   // this cluster's clusterId
   private int id;
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Fri Apr 30 20:16:56 2010
@@ -30,6 +30,7 @@ import org.apache.commons.cli2.commandli
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -37,6 +38,9 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
 import org.apache.mahout.math.VectorWritable;
@@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory;
 public final class CanopyDriver {
   
   private static final Logger log = LoggerFactory.getLogger(CanopyDriver.class);
+  public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "/clusteredPoints";
   
   private CanopyDriver() { }
   
@@ -107,7 +112,7 @@ public final class CanopyDriver {
       double t1 = Double.parseDouble(cmdLine.getValue(t1Opt).toString());
       double t2 = Double.parseDouble(cmdLine.getValue(t2Opt).toString());
       
-      runJob(input, output, measureClass, t1, t2);
+      runJob(input, output, measureClass, t1, t2, false);
     } catch (OptionException e) {
       log.error("Exception", e);
       CommandLineUtil.printHelp(group);
@@ -128,9 +133,11 @@ public final class CanopyDriver {
    *          the T1 distance threshold
    * @param t2
    *          the T2 distance threshold
+   * @param runClustering 
+   *          true if points are to be clustered after clusters are determined
    */
   public static void runJob(String input, String output,
-                            String measureClassName, double t1, double t2) throws IOException {
+                            String measureClassName, double t1, double t2, boolean runClustering) throws IOException {
     log.info("Input: {} Out: {} " 
       + "Measure: {} t1: {} t2: {}", new Object[] {input, output, measureClassName, t1, t2});
     Configurable client = new JobClient();
@@ -147,7 +154,9 @@ public final class CanopyDriver {
     conf.setOutputValueClass(Canopy.class);
     
     FileInputFormat.setInputPaths(conf, new Path(input));
-    Path outPath = new Path(output);
+    
+    String canopyOutputDir = output + Cluster.CLUSTERS_DIR + "0";
+    Path outPath = new Path(canopyOutputDir);
     FileOutputFormat.setOutputPath(conf, outPath);
     
     conf.setMapperClass(CanopyMapper.class);
@@ -161,6 +170,61 @@ public final class CanopyDriver {
       dfs.delete(outPath, true);
     }
     JobClient.runJob(conf);
+    
+    if (runClustering){
+      runClustering(input, canopyOutputDir, output, measureClassName, t1, t2);
+    }
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param points
+   *          the input points directory pathname String
+   * @param canopies
+   *          the input canopies directory pathname String
+   * @param output
+   *          the output directory pathname String
+   * @param measureClassName
+   *          the DistanceMeasure class name
+   * @param t1
+   *          the T1 distance threshold
+   * @param t2
+   *          the T2 distance threshold
+   */
+  public static void runClustering(String points,
+                            String canopies,
+                            String output,
+                            String measureClassName,
+                            double t1,
+                            double t2) throws IOException {
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(CanopyDriver.class);
+    
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
+    conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies);
+    
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(WeightedVectorWritable.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    FileInputFormat.setInputPaths(conf, new Path(points));
+    Path outPath = new Path(output + DEFAULT_CLUSTERED_POINTS_DIRECTORY);
+    FileOutputFormat.setOutputPath(conf, outPath);
+    
+    conf.setMapperClass(ClusterMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setNumReduceTasks(0);
+    
+    client.setConf(conf);
+    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+    if (dfs.exists(outPath)) {
+      dfs.delete(outPath, true);
+    }
+    JobClient.runJob(conf);
   }
   
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java Fri Apr 30 20:16:56 2010
@@ -105,7 +105,7 @@ public class DirichletCluster<O> impleme
 
   @Override
   public String asFormatString(String[] bindings) {
-    return "C-" + model.getId() + ": " + model.toString();
+    return "C-" + model.getId() + ": " + model.asFormatString(bindings);
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java Fri Apr 30 20:16:56 2010
@@ -35,8 +35,8 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.clustering.ClusterBase;
 import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public class DirichletClusterMapper extends MapReduceBase implements
@@ -46,21 +46,13 @@ public class DirichletClusterMapper exte
 
   private List<DirichletCluster> clusters;
 
+  private DirichletClusterer clusterer;
+
   @SuppressWarnings("unchecked")
   @Override
   public void map(WritableComparable<?> key, VectorWritable vector, OutputCollector<IntWritable, WeightedVectorWritable> output,
       Reporter reporter) throws IOException {
-    int clusterId = -1;
-    double clusterPdf = 0;
-    for (int i = 0; i < clusters.size(); i++) {
-      double pdf = clusters.get(i).getModel().pdf(vector);
-      if (pdf > clusterPdf) {
-        clusterId = i;
-        clusterPdf = pdf;
-      }
-    }
-    //System.out.println(clusterId + ": " + ClusterBase.formatVector(vector.get(), null));
-    output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, vector));
+    clusterer.emitPointToClusters(vector, clusters, output);
   }
 
   @Override
@@ -68,6 +60,9 @@ public class DirichletClusterMapper exte
     super.configure(job);
     try {
       clusters = getClusters(job);
+      String emitMostLikely = job.get(DirichletDriver.EMIT_MOST_LIKELY_KEY);
+      String threshold = job.get(DirichletDriver.THRESHOLD_KEY);
+      clusterer = new DirichletClusterer<Vector>(Boolean.parseBoolean(emitMostLikely), Double.parseDouble(threshold));
     } catch (SecurityException e) {
       throw new IllegalStateException(e);
     } catch (IllegalArgumentException e) {

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java Fri Apr 30 20:16:56 2010
@@ -17,13 +17,18 @@
 
 package org.apache.mahout.clustering.dirichlet;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.clustering.dirichlet.models.Model;
 import org.apache.mahout.clustering.dirichlet.models.ModelDistribution;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.function.TimesFunction;
 
 /**
@@ -80,24 +85,28 @@ import org.apache.mahout.math.function.T
  * </pre>
  */
 public class DirichletClusterer<O> {
-  
+
   // observed data
   private final List<O> sampleData;
-  
+
   // the ModelDistribution for the computation
   private final ModelDistribution<O> modelFactory;
-  
+
   // the state of the clustering process
   private final DirichletState<O> state;
-  
+
   private final int thin;
-  
+
   private final int burnin;
-  
+
   private final int numClusters;
-  
+
   private final List<Model<O>[]> clusterSamples = new ArrayList<Model<O>[]>();
-  
+
+  private boolean emitMostLikely;
+
+  private double threshold;
+
   /**
    * Create a new instance on the sample data with the given additional parameters
    * 
@@ -114,12 +123,8 @@ public class DirichletClusterer<O> {
    * @param burnin
    *          the int burnin interval, used to suppress early iterations
    */
-  public DirichletClusterer(List<O> sampleData,
-                            ModelDistribution<O> modelFactory,
-                            double alpha_0,
-                            int numClusters,
-                            int thin,
-                            int burnin) {
+  public DirichletClusterer(List<O> sampleData, ModelDistribution<O> modelFactory, double alpha_0, int numClusters, int thin,
+      int burnin) {
     this.sampleData = sampleData;
     this.modelFactory = modelFactory;
     this.thin = thin;
@@ -127,7 +132,24 @@ public class DirichletClusterer<O> {
     this.numClusters = numClusters;
     state = new DirichletState<O>(modelFactory, numClusters, alpha_0);
   }
-  
+
+  /**
+   * This constructor only used by DirichletClusterMapper for setting up clustering params
+   * @param emitMostLikely
+   * @param threshold
+   */
+  public DirichletClusterer(boolean emitMostLikely, double threshold) {
+    super();
+    this.sampleData = null;
+    this.modelFactory = null;
+    this.thin = 0;
+    this.burnin = 0;
+    this.numClusters = 0;
+    this.state = null;
+    this.emitMostLikely = emitMostLikely;
+    this.threshold = threshold;
+  }
+
   /**
    * Iterate over the sample data, obtaining cluster samples periodically and returning them.
    * 
@@ -141,7 +163,7 @@ public class DirichletClusterer<O> {
     }
     return clusterSamples;
   }
-  
+
   /**
    * Perform one iteration of the clustering process, iterating over the samples to build a new array of
    * models, then updating the state for the next iteration
@@ -150,10 +172,10 @@ public class DirichletClusterer<O> {
    *          the DirichletState<Observation> of this iteration
    */
   private void iterate(int iteration, DirichletState<O> state) {
-    
+
     // create new posterior models
     Model<O>[] newModels = modelFactory.sampleFromPosterior(state.getModels());
-    
+
     // iterate over the samples, assigning each to a model
     for (O x : sampleData) {
       // compute normalized vector of probabilities that x is described by each model
@@ -164,7 +186,7 @@ public class DirichletClusterer<O> {
       // ask the selected model to observe the datum
       newModels[k].observe(x);
     }
-    
+
     // periodically add models to the cluster samples after the burn-in period
     if ((iteration >= burnin) && (iteration % thin == 0)) {
       clusterSamples.add(newModels);
@@ -172,7 +194,7 @@ public class DirichletClusterer<O> {
     // update the state from the new models
     state.update(newModels);
   }
-  
+
   /**
    * Compute a normalized vector of probabilities that x is described by each model using the mixture and the
    * model pdfs
@@ -197,7 +219,54 @@ public class DirichletClusterer<O> {
     pi.assign(new TimesFunction(), 1.0 / max);
     return pi;
   }
-  
+
+  public void emitPointToClusters(VectorWritable point, List<DirichletCluster> clusters,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    Vector pi = new DenseVector(clusters.size());
+    for (int i = 0; i < clusters.size(); i++) {
+      pi.set(i, clusters.get(i).getModel().pdf(point));
+    }
+    pi = pi.divide(pi.zSum());
+    if (emitMostLikely) {
+      emitMostLikelyCluster(point, clusters, pi, output);
+    } else {
+      emitAllClusters(point, clusters, pi, output);
+    }
+  }
+
+  /**
+   * Emit the point to the most likely cluster
+   * @param point
+   * @param pi the normalized pdf Vector for the point
+   * @param output
+   * @throws IOException
+   */
+  void emitMostLikelyCluster(VectorWritable point, List<DirichletCluster> clusters, Vector pi,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    int clusterId = -1;
+    double clusterPdf = 0;
+    for (int i = 0; i < clusters.size(); i++) {
+      double pdf = pi.get(i);
+      if (pdf > clusterPdf) {
+        clusterId = i;
+        clusterPdf = pdf;
+      }
+    }
+    //System.out.println(clusterId + ": " + ClusterBase.formatVector(vector.get(), null));
+    output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, point));
+  }
+
+  void emitAllClusters(VectorWritable point, List<DirichletCluster> clusters, Vector pi,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    for (int i = 0; i < clusters.size(); i++) {
+      double pdf = pi.get(i);
+      if (pdf > threshold && clusters.get(i).getTotalCount() > 0) {
+        //System.out.println(i + ": " + ClusterBase.formatVector(vector.get(), null));
+        output.collect(new IntWritable(i), new WeightedVectorWritable(pdf, point));
+      }
+    }
+  }
+
   /**
    * Create a new instance on the sample data with the given additional parameters
    * 
@@ -216,16 +285,10 @@ public class DirichletClusterer<O> {
    * @param numIterations
    *          number of iterations to be performed
    */
-  public static List<Model<Vector>[]> clusterPoints(List<Vector> points,
-                                                    ModelDistribution<Vector> modelFactory,
-                                                    double alpha_0,
-                                                    int numClusters,
-                                                    int thin,
-                                                    int burnin,
-                                                    int numIterations) {
-    DirichletClusterer<Vector> clusterer = new DirichletClusterer<Vector>(points, modelFactory, alpha_0,
-        numClusters, thin, burnin);
+  public static List<Model<Vector>[]> clusterPoints(List<Vector> points, ModelDistribution<Vector> modelFactory, double alpha_0,
+      int numClusters, int thin, int burnin, int numIterations) {
+    DirichletClusterer<Vector> clusterer = new DirichletClusterer<Vector>(points, modelFactory, alpha_0, numClusters, thin, burnin);
     return clusterer.cluster(numIterations);
-    
+
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java Fri Apr 30 20:16:56 2010
@@ -41,7 +41,7 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.clustering.dirichlet.models.VectorModelDistribution;
 import org.apache.mahout.clustering.kmeans.KMeansDriver;
@@ -53,62 +53,74 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DirichletDriver {
-  
 
   public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
-  
+
   public static final String MODEL_FACTORY_KEY = "org.apache.mahout.clustering.dirichlet.modelFactory";
-  
+
   public static final String MODEL_PROTOTYPE_KEY = "org.apache.mahout.clustering.dirichlet.modelPrototype";
-  
+
   public static final String PROTOTYPE_SIZE_KEY = "org.apache.mahout.clustering.dirichlet.prototypeSize";
-  
+
   public static final String NUM_CLUSTERS_KEY = "org.apache.mahout.clustering.dirichlet.numClusters";
-  
+
   public static final String ALPHA_0_KEY = "org.apache.mahout.clustering.dirichlet.alpha_0";
-  
+
+  public static final String EMIT_MOST_LIKELY_KEY = "org.apache.mahout.clustering.dirichlet.emitMostLikely";
+
+  public static final String THRESHOLD_KEY = "org.apache.mahout.clustering.dirichlet.threshold";
+
   private static final Logger log = LoggerFactory.getLogger(DirichletDriver.class);
-  
-  private DirichletDriver() {}
-  
+
+  private DirichletDriver() {
+  }
+
   public static void main(String[] args) throws Exception {
     DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
     ArgumentBuilder abuilder = new ArgumentBuilder();
     GroupBuilder gbuilder = new GroupBuilder();
-    
+
     Option inputOpt = DefaultOptionCreator.inputOption().create();
     Option outputOpt = DefaultOptionCreator.outputOption().create();
     Option maxIterOpt = DefaultOptionCreator.maxIterOption().create();
     Option topicsOpt = DefaultOptionCreator.kOption().create();
     Option helpOpt = DefaultOptionCreator.helpOption();
-    
+
     Option mOpt = obuilder.withLongName("alpha").withRequired(true).withShortName("m").withArgument(
-      abuilder.withName("alpha").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The alpha0 value for the DirichletDistribution.").create();
-    
+        abuilder.withName("alpha").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The alpha0 value for the DirichletDistribution.").create();
+
     Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(
-      abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The ModelDistribution class name. "
-          + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution").create();
-    
-    Option prototypeOpt = obuilder.withLongName("modelPrototypeClass").withRequired(false).withShortName("p")
-        .withArgument(abuilder.withName("prototypeClass").withMinimum(1).withMaximum(1).create())
-        .withDescription(
-          "The ModelDistribution prototype Vector class name. "
-              + "Defaults to org.apache.mahout.math.RandomAccessSparseVector").create();
-    
-    Option sizeOpt = obuilder.withLongName("prototypeSize").withRequired(true).withShortName("s")
-        .withArgument(abuilder.withName("prototypeSize").withMinimum(1).withMaximum(1).create())
-        .withDescription("The ModelDistribution prototype Vector size. ").create();
-    
+        abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The ModelDistribution class name. " + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution")
+        .create();
+
+    Option prototypeOpt = obuilder.withLongName("modelPrototypeClass").withRequired(false).withShortName("p").withArgument(
+        abuilder.withName("prototypeClass").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The ModelDistribution prototype Vector class name. " + "Defaults to org.apache.mahout.math.RandomAccessSparseVector")
+        .create();
+
+    Option sizeOpt = obuilder.withLongName("prototypeSize").withRequired(true).withShortName("s").withArgument(
+        abuilder.withName("prototypeSize").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The ModelDistribution prototype Vector size. ").create();
+
     Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(
-      abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The number of reduce tasks.").create();
-    
-    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt)
-        .withOption(modelOpt).withOption(prototypeOpt).withOption(sizeOpt).withOption(maxIterOpt).withOption(
-          mOpt).withOption(topicsOpt).withOption(helpOpt).withOption(numRedOpt).create();
-    
+        abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks.").create();
+
+    Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
+        "If true, run clustering after the iterations have taken place").withShortName("cl").create();
+
+    Option emitMostLikelyOpt = obuilder.withLongName("emitMostLikely").withRequired(false).withShortName("e").withArgument(
+        abuilder.withName("emitMostLikely").withMinimum(1).withMaximum(1).create()).withDescription(
+        "True if clustering emits most likely point only, false for threshold clustering").create();
+
+    Option thresholdOpt = obuilder.withLongName("threshold").withRequired(false).withShortName("t").withArgument(
+        abuilder.withName("threshold").withMinimum(1).withMaximum(1).create()).withDescription("The pdf threshold").create();
+
+    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt).withOption(modelOpt).withOption(
+        prototypeOpt).withOption(sizeOpt).withOption(maxIterOpt).withOption(mOpt).withOption(topicsOpt).withOption(helpOpt)
+        .withOption(numRedOpt).withOption(clusteringOpt).withOption(emitMostLikelyOpt).withOption(thresholdOpt).create();
+
     try {
       Parser parser = new Parser();
       parser.setGroup(group);
@@ -117,7 +129,7 @@ public class DirichletDriver {
         CommandLineUtil.printHelp(group);
         return;
       }
-      
+
       String input = cmdLine.getValue(inputOpt).toString();
       String output = cmdLine.getValue(outputOpt).toString();
       String modelFactory = "org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution";
@@ -132,15 +144,27 @@ public class DirichletDriver {
       int numReducers = Integer.parseInt(cmdLine.getValue(numRedOpt).toString());
       int numModels = Integer.parseInt(cmdLine.getValue(topicsOpt).toString());
       int maxIterations = Integer.parseInt(cmdLine.getValue(maxIterOpt).toString());
+      boolean runClustering = true;
+      if (cmdLine.hasOption(clusteringOpt)) {
+        runClustering = Boolean.parseBoolean(cmdLine.getValue(clusteringOpt).toString());
+      }
+      boolean emitMostLikely = true;
+      if (cmdLine.hasOption(emitMostLikelyOpt)) {
+        emitMostLikely = Boolean.parseBoolean(cmdLine.getValue(emitMostLikelyOpt).toString());
+      }
+      double threshold = 0;
+      if (cmdLine.hasOption(thresholdOpt)) {
+        threshold = Double.parseDouble(cmdLine.getValue(thresholdOpt).toString());
+      }
       double alpha_0 = Double.parseDouble(cmdLine.getValue(mOpt).toString());
-      runJob(input, output, modelFactory, modelPrototype, prototypeSize, numModels, maxIterations, alpha_0,
-        numReducers);
+      runJob(input, output, modelFactory, modelPrototype, prototypeSize, numModels, maxIterations, alpha_0, numReducers,
+          runClustering, emitMostLikely, threshold);
     } catch (OptionException e) {
       log.error("Exception parsing command line: ", e);
       CommandLineUtil.printHelp(group);
     }
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -161,23 +185,13 @@ public class DirichletDriver {
    * @deprecated since it presumes 2-d, dense vector model prototypes
    */
   @Deprecated
-  public static void runJob(String input,
-                            String output,
-                            String modelFactory,
-                            int numClusters,
-                            int maxIterations,
-                            double alpha_0,
-                            int numReducers) throws ClassNotFoundException,
-                                            InstantiationException,
-                                            IllegalAccessException,
-                                            IOException,
-                                            SecurityException,
-                                            NoSuchMethodException,
-                                            InvocationTargetException {
-    runJob(input, output, modelFactory, "org.apache.mahout.math.DenseVector", 2, numClusters, maxIterations,
-      alpha_0, numReducers);
+  public static void runJob(String input, String output, String modelFactory, int numClusters, int maxIterations, double alpha_0,
+      int numReducers) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException,
+      SecurityException, NoSuchMethodException, InvocationTargetException {
+    runJob(input, output, modelFactory, "org.apache.mahout.math.DenseVector", 2, numClusters, maxIterations, alpha_0, numReducers,
+        false, true, 0);
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -199,55 +213,40 @@ public class DirichletDriver {
    *          the alpha_0 value for the DirichletDistribution
    * @param numReducers
    *          the number of Reducers desired
+   * @param runClustering 
+   *          true if clustering of points to be done after iterations
+   * @param emitMostLikely
+   *          a boolean if true emit only most likely cluster for each point
+   * @param threshold 
+   *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
    */
-  public static void runJob(String input,
-                            String output,
-                            String modelFactory,
-                            String modelPrototype,
-                            int prototypeSize,
-                            int numClusters,
-                            int maxIterations,
-                            double alpha_0,
-                            int numReducers) throws ClassNotFoundException,
-                                            InstantiationException,
-                                            IllegalAccessException,
-                                            IOException,
-                                            SecurityException,
-                                            NoSuchMethodException,
-                                            InvocationTargetException {
-    
-    String clustersIn = output + ClusterBase.INITIAL_CLUSTERS_DIR;
+  public static void runJob(String input, String output, String modelFactory, String modelPrototype, int prototypeSize,
+      int numClusters, int maxIterations, double alpha_0, int numReducers, boolean runClustering, boolean emitMostLikely,
+      double threshold) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException,
+      SecurityException, NoSuchMethodException, InvocationTargetException {
+
+    String clustersIn = output + Cluster.INITIAL_CLUSTERS_DIR;
     writeInitialState(output, clustersIn, modelFactory, modelPrototype, prototypeSize, numClusters, alpha_0);
-    
+
     for (int iteration = 1; iteration <= maxIterations; iteration++) {
       log.info("Iteration {}", iteration);
       // point the output to a new directory per iteration
-      String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
-      runIteration(input, clustersIn, clustersOut, modelFactory, modelPrototype, prototypeSize, numClusters,
-        alpha_0, numReducers);
+      String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
+      runIteration(input, clustersIn, clustersOut, modelFactory, modelPrototype, prototypeSize, numClusters, alpha_0, numReducers);
       // now point the input to the old output directory
       clustersIn = clustersOut;
     }
-    // now cluster the most likely points
-    runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR);
+    if (runClustering) {
+      // now cluster the most likely points
+      runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, emitMostLikely, threshold);
+    }
   }
-  
-  private static void writeInitialState(String output,
-                                        String stateIn,
-                                        String modelFactory,
-                                        String modelPrototype,
-                                        int prototypeSize,
-                                        int numModels,
-                                        double alpha_0) throws ClassNotFoundException,
-                                                       InstantiationException,
-                                                       IllegalAccessException,
-                                                       IOException,
-                                                       SecurityException,
-                                                       NoSuchMethodException,
-                                                       InvocationTargetException {
-    
-    DirichletState<VectorWritable> state = createState(modelFactory, modelPrototype, prototypeSize,
-      numModels, alpha_0);
+
+  private static void writeInitialState(String output, String stateIn, String modelFactory, String modelPrototype,
+      int prototypeSize, int numModels, double alpha_0) throws ClassNotFoundException, InstantiationException,
+      IllegalAccessException, IOException, SecurityException, NoSuchMethodException, InvocationTargetException {
+
+    DirichletState<VectorWritable> state = createState(modelFactory, modelPrototype, prototypeSize, numModels, alpha_0);
     JobConf job = new JobConf(KMeansDriver.class);
     Path outPath = new Path(output);
     FileSystem fs = FileSystem.get(outPath.toUri(), job);
@@ -259,7 +258,7 @@ public class DirichletDriver {
       writer.close();
     }
   }
-  
+
   /**
    * Creates a DirichletState object from the given arguments. Note that the modelFactory is presumed to be a
    * subclass of VectorModelDistribution that can be initialized with a concrete Vector prototype.
@@ -276,29 +275,20 @@ public class DirichletDriver {
    *          the double alpha_0 argument to the algorithm
    * @return an initialized DirichletState
    */
-  public static DirichletState<VectorWritable> createState(String modelFactory,
-                                                           String modelPrototype,
-                                                           int prototypeSize,
-                                                           int numModels,
-                                                           double alpha_0) throws ClassNotFoundException,
-                                                                          InstantiationException,
-                                                                          IllegalAccessException,
-                                                                          SecurityException,
-                                                                          NoSuchMethodException,
-                                                                          IllegalArgumentException,
-                                                                          InvocationTargetException {
-    
+  public static DirichletState<VectorWritable> createState(String modelFactory, String modelPrototype, int prototypeSize,
+      int numModels, double alpha_0) throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+      SecurityException, NoSuchMethodException, IllegalArgumentException, InvocationTargetException {
+
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    Class<? extends VectorModelDistribution> cl = ccl.loadClass(modelFactory).asSubclass(
-      VectorModelDistribution.class);
+    Class<? extends VectorModelDistribution> cl = ccl.loadClass(modelFactory).asSubclass(VectorModelDistribution.class);
     VectorModelDistribution factory = cl.newInstance();
-    
+
     Class<? extends Vector> vcl = ccl.loadClass(modelPrototype).asSubclass(Vector.class);
     Constructor<? extends Vector> v = vcl.getConstructor(int.class);
     factory.setModelPrototype(new VectorWritable(v.newInstance(prototypeSize)));
     return new DirichletState<VectorWritable>(factory, numModels, alpha_0);
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -321,27 +311,20 @@ public class DirichletDriver {
    * @param numReducers
    *          the number of Reducers desired
    */
-  public static void runIteration(String input,
-                                  String stateIn,
-                                  String stateOut,
-                                  String modelFactory,
-                                  String modelPrototype,
-                                  int prototypeSize,
-                                  int numClusters,
-                                  double alpha_0,
-                                  int numReducers) {
+  public static void runIteration(String input, String stateIn, String stateOut, String modelFactory, String modelPrototype,
+      int prototypeSize, int numClusters, double alpha_0, int numReducers) {
     Configurable client = new JobClient();
     JobConf conf = new JobConf(DirichletDriver.class);
-    
+
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(DirichletCluster.class);
     conf.setMapOutputKeyClass(Text.class);
     conf.setMapOutputValueClass(VectorWritable.class);
-    
+
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(stateOut);
     FileOutputFormat.setOutputPath(conf, outPath);
-    
+
     conf.setMapperClass(DirichletMapper.class);
     conf.setReducerClass(DirichletReducer.class);
     conf.setNumReduceTasks(numReducers);
@@ -353,7 +336,7 @@ public class DirichletDriver {
     conf.set(PROTOTYPE_SIZE_KEY, Integer.toString(prototypeSize));
     conf.set(NUM_CLUSTERS_KEY, Integer.toString(numClusters));
     conf.set(ALPHA_0_KEY, Double.toString(alpha_0));
-    
+
     client.setConf(conf);
     try {
       JobClient.runJob(conf);
@@ -361,7 +344,7 @@ public class DirichletDriver {
       log.warn(e.toString(), e);
     }
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -371,27 +354,33 @@ public class DirichletDriver {
    *          the directory pathname for input state
    * @param output
    *          the directory pathname for output points
+   * @param emitMostLikely
+   *          a boolean if true emit only most likely cluster for each point
+   * @param threshold 
+   *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
    */
-  public static void runClustering(String input, String stateIn, String output) {
+  public static void runClustering(String input, String stateIn, String output, boolean emitMostLikely, double threshold) {
     JobConf conf = new JobConf(DirichletDriver.class);
     conf.setJobName("Dirichlet Clustering");
-    
+
     conf.setOutputKeyClass(IntWritable.class);
     conf.setOutputValueClass(WeightedVectorWritable.class);
-    
+
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(output);
     FileOutputFormat.setOutputPath(conf, outPath);
-    
+
     conf.setMapperClass(DirichletClusterMapper.class);
-    
+
     conf.setInputFormat(SequenceFileInputFormat.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
-    
+
     // uncomment it to run locally
     // conf.set("mapred.job.tracker", "local");
     conf.setNumReduceTasks(0);
     conf.set(STATE_IN_KEY, stateIn);
+    conf.set(EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
+    conf.set(THRESHOLD_KEY, Double.toString(threshold));
     try {
       JobClient.runJob(conf);
     } catch (IOException e) {

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Fri Apr 30 20:16:56 2010
@@ -29,6 +29,9 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.dirichlet.DirichletDriver;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public class FuzzyKMeansClusterMapper extends MapReduceBase implements
@@ -38,10 +41,14 @@ public class FuzzyKMeansClusterMapper ex
 
   private FuzzyKMeansClusterer clusterer;
 
+  private boolean emitMostLikely = false;
+
+  private double threshold = 0;
+
   @Override
   public void map(WritableComparable<?> key, VectorWritable point, OutputCollector<IntWritable, WeightedVectorWritable> output,
       Reporter reporter) throws IOException {
-    clusterer.outputPointWithClusterProbabilities(key.toString(), point.get(), clusters, output);
+    clusterer.emitPointToClusters(point, clusters, output);
   }
 
   /**

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java Fri Apr 30 20:16:56 2010
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
@@ -40,15 +41,19 @@ public class FuzzyKMeansClusterer {
 
   private double m = 2.0; // default value
 
+  private boolean emitMostLikely = true;
+
+  private double threshold = 0;
+
   /**
-   * Init the fuzzy k-means clusterer with the distance measure to use for comparison.
-   * 
-   * @param measure
-   *          The distance measure to use for comparing clusters against points.
-   * @param convergenceDelta
-   *          When do we define a cluster to have converged?
-   * 
-   * */
+    * Init the fuzzy k-means clusterer with the distance measure to use for comparison.
+    * 
+    * @param measure
+    *          The distance measure to use for comparing clusters against points.
+    * @param convergenceDelta
+    *          When do we define a cluster to have converged?
+    * 
+    * */
   public FuzzyKMeansClusterer(DistanceMeasure measure, double convergenceDelta, double m) {
     this.measure = measure;
     this.convergenceDelta = convergenceDelta;
@@ -74,6 +79,9 @@ public class FuzzyKMeansClusterer {
       convergenceDelta = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY));
       // nextClusterId = 0;
       m = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.M_KEY));
+      emitMostLikely = Boolean.parseBoolean(job.get(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY));
+      threshold = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.THRESHOLD_KEY));
+
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
     } catch (IllegalAccessException e) {
@@ -114,45 +122,6 @@ public class FuzzyKMeansClusterer {
     }
   }
 
-  /**
-   * Output point with cluster info (Cluster and probability)
-   * 
-   * @param point
-   *          a point
-   * @param clusters
-   *          a List<SoftCluster> to test
-   * @param output
-   *          the OutputCollector to emit into
-   */
-  public void outputPointWithClusterProbabilities(String key, Vector point, List<SoftCluster> clusters,
-      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
-
-    // calculate point distances for all clusters    
-    List<Double> clusterDistanceList = new ArrayList<Double>();
-    for (SoftCluster cluster : clusters) {
-      clusterDistanceList.add(measure.distance(cluster.getCenter(), point));
-    }
-    // calculate point pdf for all clusters
-    List<Double> clusterPdfList = new ArrayList<Double>();
-    for (int i = 0; i < clusters.size(); i++) {
-      double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
-      clusterPdfList.add(probWeight);
-    }
-    // for now just emit the most likely cluster
-    int clusterId = -1;
-    double clusterPdf = 0;
-    for (int i = 0; i < clusters.size(); i++) {
-      // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
-      double pdf = clusterPdfList.get(i);
-      if (pdf > clusterPdf) {
-        clusterId = clusters.get(i).getId();
-        clusterPdf = pdf;
-      }
-    }
-    // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
-    output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, new VectorWritable(point)));
-  }
-
   /** Computes the probability of a point belonging to a cluster */
   public double computeProbWeight(double clusterDistance, List<Double> clusterDistanceList) {
     if (clusterDistance == 0) {
@@ -187,6 +156,71 @@ public class FuzzyKMeansClusterer {
     return this.measure;
   }
 
+  public void emitPointToClusters(VectorWritable point, List<SoftCluster> clusters,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    // calculate point distances for all clusters    
+    List<Double> clusterDistanceList = new ArrayList<Double>();
+    for (SoftCluster cluster : clusters) {
+      clusterDistanceList.add(getMeasure().distance(cluster.getCenter(), point.get()));
+    }
+    // calculate point pdf for all clusters
+    Vector pi = new DenseVector(clusters.size());
+    for (int i = 0; i < clusters.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+      pi.set(i, probWeight);
+    }
+    if (emitMostLikely) {
+      emitMostLikelyCluster(point.get(), clusters, pi, output);
+    } else {
+      emitAllClusters(point.get(), clusters, pi, output);
+    }
+  }
+
+  /**
+   * Emit the point to the cluster with the highest pdf
+   * 
+   * @param point
+   * @param clusters
+   * @param clusterPdfList
+   * @param output
+   * @throws IOException
+   */
+  void emitMostLikelyCluster(Vector point, List<SoftCluster> clusters, Vector clusterPdfList,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    int clusterId = -1;
+    double clusterPdf = 0;
+    for (int i = 0; i < clusters.size(); i++) {
+      // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
+      double pdf = clusterPdfList.get(i);
+      if (pdf > clusterPdf) {
+        clusterId = clusters.get(i).getId();
+        clusterPdf = pdf;
+      }
+    }
+    // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+    output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, new VectorWritable(point)));
+  }
+
+  /**
+   * Emit the point to all clusters
+   * 
+   * @param point
+   * @param clusters
+   * @param pi
+   * @param output
+   * @throws IOException
+   */
+  void emitAllClusters(Vector point, List<SoftCluster> clusters, Vector pi,
+      OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+    for (int i = 0; i < clusters.size(); i++) {
+      double pdf = pi.get(i);
+      if (pdf > threshold) {
+        // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+        output.collect(new IntWritable(i), new WeightedVectorWritable(pdf, new VectorWritable(point)));
+      }
+    }
+  }
+
   /**
    * This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
    * until their centers converge or until the maximum number of iterations is exceeded.

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java Fri Apr 30 20:16:56 2010
@@ -18,10 +18,17 @@
 package org.apache.mahout.clustering.fuzzykmeans;
 
 public interface FuzzyKMeansConfigKeys {
-  
+
   String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
   String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
   String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
   String M_KEY = "org.apache.mahout.clustering.fuzzykmeans.m";
-  
+
+  String EMIT_MOST_LIKELY_KEY = "org.apache.mahout.clustering.fuzzykmeans.emitMostLikely";
+
+  String THRESHOLD_KEY = "org.apache.mahout.clustering.fuzzykmeans.threshold";
+
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Fri Apr 30 20:16:56 2010
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.common.CommandLineUtil;
@@ -54,79 +54,83 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class FuzzyKMeansDriver {
-  
+
   private static final Logger log = LoggerFactory.getLogger(FuzzyKMeansDriver.class);
   
-  private FuzzyKMeansDriver() { }
-  
+  private FuzzyKMeansDriver() {
+  }
+
   public static void main(String[] args) throws Exception {
     DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
     ArgumentBuilder abuilder = new ArgumentBuilder();
     GroupBuilder gbuilder = new GroupBuilder();
     Option inputOpt = obuilder.withLongName("input").withRequired(true).withArgument(
-      abuilder.withName("input").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The Path for input Vectors. Must be a SequenceFile of Writable, Vector").withShortName("i").create();
-    
+        abuilder.withName("input").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The Path for input Vectors. Must be a SequenceFile of Writable, Vector").withShortName("i").create();
+
     Option clustersOpt = obuilder.withLongName("clusters").withRequired(true).withArgument(
-      abuilder.withName("clusters").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  "
-          + "If k is also specified, then a random set of vectors will be selected"
-          + " and written out to this path first").withShortName("c").create();
-    
+        abuilder.withName("clusters").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.  "
+            + "If k is also specified, then a random set of vectors will be selected" + " and written out to this path first")
+        .withShortName("c").create();
+
     Option kOpt = obuilder.withLongName("k").withRequired(false).withArgument(
-      abuilder.withName("k").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The k in k-Means.  If specified, then a random selection of k Vectors will be chosen"
-          + " as the Centroid and written to the clusters output path.").withShortName("k").create();
-    
+        abuilder.withName("k").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The k in k-Means.  If specified, then a random selection of k Vectors will be chosen"
+            + " as the Centroid and written to the clusters output path.").withShortName("k").create();
+
     Option outputOpt = obuilder.withLongName("output").withRequired(true).withArgument(
-      abuilder.withName("output").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The Path to put the output in").withShortName("o").create();
-    
+        abuilder.withName("output").withMinimum(1).withMaximum(1).create()).withDescription("The Path to put the output in")
+        .withShortName("o").create();
+
     Option measureClassOpt = obuilder.withLongName("distance").withRequired(false).withArgument(
-      abuilder.withName("distance").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The Distance Measure to use.  Default is SquaredEuclidean").withShortName("dm").create();
-    
+        abuilder.withName("distance").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The Distance Measure to use.  Default is SquaredEuclidean").withShortName("dm").create();
+
     Option convergenceDeltaOpt = obuilder.withLongName("convergence").withRequired(false).withArgument(
-      abuilder.withName("convergence").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The threshold below which the clusters are considered to be converged.  Default is 0.5")
-        .withShortName("d").create();
-    
+        abuilder.withName("convergence").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The threshold below which the clusters are considered to be converged.  Default is 0.5").withShortName("d").create();
+
     Option maxIterationsOpt = obuilder.withLongName("max").withRequired(false).withArgument(
-      abuilder.withName("max").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The maximum number of iterations to perform.  Default is 20").withShortName("x").create();
-    
+        abuilder.withName("max").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The maximum number of iterations to perform.  Default is 20").withShortName("x").create();
+
     Option vectorClassOpt = obuilder.withLongName("vectorClass").withRequired(false).withArgument(
-      abuilder.withName("vectorClass").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The Vector implementation class name.  Default is RandomAccessSparseVector.class").withShortName("v")
-        .create();
-    
-    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
-        .create();
-    
+        abuilder.withName("vectorClass").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The Vector implementation class name.  Default is RandomAccessSparseVector.class").withShortName("v").create();
+
+    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();
+
     Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription(
-      "If set, overwrite the output directory").withShortName("w").create();
-    
-    Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
-      "If true, run clustering only (assumes the iterations have already taken place").withShortName("l")
-        .create();
-    
+        "If set, overwrite the output directory").withShortName("w").create();
+
     Option mOpt = obuilder.withLongName("m").withRequired(true).withArgument(
-      abuilder.withName("m").withMinimum(1).withMaximum(1).create()).withDescription(
-      "coefficient normalization factor, must be greater than 1").withShortName("m").create();
-    
+        abuilder.withName("m").withMinimum(1).withMaximum(1).create()).withDescription(
+        "coefficient normalization factor, must be greater than 1").withShortName("m").create();
+
     Option numReduceTasksOpt = obuilder.withLongName("numReduce").withRequired(false).withArgument(
-      abuilder.withName("numReduce").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The number of reduce tasks").withShortName("r").create();
-    
+        abuilder.withName("numReduce").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks")
+        .withShortName("r").create();
+
     Option numMapTasksOpt = obuilder.withLongName("numMap").withRequired(false).withArgument(
-      abuilder.withName("numMap").withMinimum(1).withMaximum(1).create()).withDescription(
-      "The number of map tasks").withShortName("u").create();
-    
-    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(clustersOpt).withOption(
-      outputOpt).withOption(measureClassOpt).withOption(convergenceDeltaOpt).withOption(maxIterationsOpt)
-        .withOption(kOpt).withOption(mOpt).withOption(vectorClassOpt).withOption(overwriteOutput).withOption(
-          helpOpt).create();
-    
+        abuilder.withName("numMap").withMinimum(1).withMaximum(1).create()).withDescription("The number of map tasks")
+        .withShortName("u").create();
+
+    Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
+        "If true, run clustering after the iterations have taken place").withShortName("cl").create();
+
+    Option emitMostLikelyOpt = obuilder.withLongName("emitMostLikely").withRequired(false).withShortName("e").withArgument(
+        abuilder.withName("emitMostLikely").withMinimum(1).withMaximum(1).create()).withDescription(
+        "True if clustering emits most likely point only, false for threshold clustering").create();
+
+    Option thresholdOpt = obuilder.withLongName("threshold").withRequired(false).withShortName("t").withArgument(
+        abuilder.withName("threshold").withMinimum(1).withMaximum(1).create()).withDescription("The pdf threshold").create();
+
+    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(clustersOpt).withOption(outputOpt).withOption(
+        measureClassOpt).withOption(convergenceDeltaOpt).withOption(maxIterationsOpt).withOption(kOpt).withOption(mOpt).withOption(
+        vectorClassOpt).withOption(overwriteOutput).withOption(helpOpt).withOption(emitMostLikelyOpt).withOption(thresholdOpt)
+        .create();
+
     try {
       Parser parser = new Parser();
       parser.setGroup(group);
@@ -147,50 +151,52 @@ public final class FuzzyKMeansDriver {
         convergenceDelta = Double.parseDouble(cmdLine.getValue(convergenceDeltaOpt).toString());
       }
       float m = Float.parseFloat(cmdLine.getValue(mOpt).toString());
-      
+
       // Class<? extends Vector> vectorClass = cmdLine.hasOption(vectorClassOpt) == false ?
       // RandomAccessSparseVector.class
       // : (Class<? extends Vector>) Class.forName(cmdLine.getValue(vectorClassOpt).toString());
-      
+
       int numReduceTasks = 10;
       if (cmdLine.hasOption(numReduceTasksOpt)) {
         numReduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString());
       }
-      
+
       int numMapTasks = 50;
       if (cmdLine.hasOption(numMapTasksOpt)) {
         numMapTasks = Integer.parseInt(cmdLine.getValue(numMapTasksOpt).toString());
       }
-      
+
       int maxIterations = 20;
       if (cmdLine.hasOption(maxIterationsOpt)) {
         maxIterations = Integer.parseInt(cmdLine.getValue(maxIterationsOpt).toString());
       }
-      
+
       if (cmdLine.hasOption(overwriteOutput)) {
         HadoopUtil.overwriteOutput(output);
       }
-      
+
       if (cmdLine.hasOption(kOpt)) {
-        clusters = RandomSeedGenerator.buildRandom(input, clusters,
-          Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
+        clusters = RandomSeedGenerator.buildRandom(input, clusters, Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
       }
-      
-      if (cmdLine.hasOption(clusteringOpt)) {
-        runClustering(input, clusters, output, measureClass, convergenceDelta, numMapTasks,
-          m);
-      } else {
-        runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations,
-          numMapTasks, numReduceTasks, m);
+
+      boolean emitMostLikely = true;
+      if (cmdLine.hasOption(emitMostLikelyOpt)) {
+        emitMostLikely = Boolean.parseBoolean(cmdLine.getValue(emitMostLikelyOpt).toString());
+      }
+      double threshold = 0;
+      if (cmdLine.hasOption(thresholdOpt)) {
+        threshold = Double.parseDouble(cmdLine.getValue(thresholdOpt).toString());
       }
-      
+      runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m, cmdLine
+          .hasOption(clusteringOpt), emitMostLikely, threshold);
+
     } catch (OptionException e) {
       log.error("Exception", e);
       CommandLineUtil.printHelp(group);
     }
-    
+
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -213,41 +219,40 @@ public final class FuzzyKMeansDriver {
    * @param m
    *          the fuzzification factor, see
    *          http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+   * @param runClustering 
+   *          true if points are to be clustered after iterations complete
+   * @param emitMostLikely
+   *          a boolean if true emit only most likely cluster for each point
+   * @param threshold 
+   *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
    */
-  public static void runJob(String input,
-                            String clustersIn,
-                            String output,
-                            String measureClass,
-                            double convergenceDelta,
-                            int maxIterations,
-                            int numMapTasks,
-                            int numReduceTasks,
-                            float m) {
-    
+  public static void runJob(String input, String clustersIn, String output, String measureClass, double convergenceDelta,
+      int maxIterations, int numMapTasks, int numReduceTasks, float m, boolean runClustering, boolean emitMostLikely,
+      double threshold) {
+
     boolean converged = false;
     int iteration = 1;
-    
+
     // iterate until the clusters converge
     while (!converged && (iteration <= maxIterations)) {
       log.info("Iteration {}", iteration);
-      
+
       // point the output to a new directory per iteration
-      String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
-      converged = runIteration(input, clustersIn, clustersOut, measureClass,
-        convergenceDelta, numMapTasks, numReduceTasks, iteration, m);
-      
+      String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
+      converged = runIteration(input, clustersIn, clustersOut, measureClass, convergenceDelta, numMapTasks, numReduceTasks,
+          iteration, m);
+
       // now point the input to the old output directory
       clustersIn = clustersOut;
       iteration++;
     }
-    
+
     // now actually cluster the points
     log.info("Clustering ");
-    
-    runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR, measureClass,
-      convergenceDelta, numMapTasks, m);
+    runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, measureClass, convergenceDelta, numMapTasks, m,
+        emitMostLikely, threshold);
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -270,45 +275,41 @@ public final class FuzzyKMeansDriver {
    *          http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
    * @return true if the iteration successfully runs
    */
-  private static boolean runIteration(String input,
-                                      String clustersIn,
-                                      String clustersOut,
-                                      String measureClass,
-                                      double convergenceDelta,
-                                      int numMapTasks,
-                                      int numReduceTasks,
-                                      int iterationNumber,
-                                      float m) {
-    
-    JobConf conf = new JobConf(FuzzyKMeansJob.class);
+  private static boolean runIteration(String input, String clustersIn, String clustersOut, String measureClass,
+      double convergenceDelta, int numMapTasks, int numReduceTasks, int iterationNumber, float m) {
+
+    JobConf conf = new JobConf(FuzzyKMeansDriver.class);
     conf.setJobName("Fuzzy K Means{" + iterationNumber + '}');
-    
+
     conf.setMapOutputKeyClass(Text.class);
     conf.setMapOutputValueClass(FuzzyKMeansInfo.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(SoftCluster.class);
-    
+
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(clustersOut);
     FileOutputFormat.setOutputPath(conf, outPath);
-    
+
     conf.setInputFormat(SequenceFileInputFormat.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
-    
+
     conf.setMapperClass(FuzzyKMeansMapper.class);
     conf.setCombinerClass(FuzzyKMeansCombiner.class);
     conf.setReducerClass(FuzzyKMeansReducer.class);
     conf.setNumMapTasks(numMapTasks);
     conf.setNumReduceTasks(numReduceTasks);
-    
+
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn);
     conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
     conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
-    
+    // these values don't matter during iterations as only used for clustering if requested
+    conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(true));
+    conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(0));
+
     // uncomment it to run locally
     // conf.set("mapred.job.tracker", "local");
-    
+
     try {
       JobClient.runJob(conf);
       FileSystem fs = FileSystem.get(outPath.toUri(), conf);
@@ -318,7 +319,7 @@ public final class FuzzyKMeansDriver {
       return true;
     }
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -334,30 +335,29 @@ public final class FuzzyKMeansDriver {
    *          the convergence delta value
    * @param numMapTasks
    *          the number of map tasks
+   * @param emitMostLikely
+   *          a boolean if true emit only most likely cluster for each point
+   * @param threshold 
+   *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
    */
-  private static void runClustering(String input,
-                                    String clustersIn,
-                                    String output,
-                                    String measureClass,
-                                    double convergenceDelta,
-                                    int numMapTasks,
-                                    float m) {
-    
+  private static void runClustering(String input, String clustersIn, String output, String measureClass, double convergenceDelta,
+      int numMapTasks, float m, boolean emitMostLikely, double threshold) {
+
     JobConf conf = new JobConf(FuzzyKMeansDriver.class);
     conf.setJobName("Fuzzy K Means Clustering");
-    
+
     conf.setOutputKeyClass(IntWritable.class);
     conf.setOutputValueClass(WeightedVectorWritable.class);
-    
+
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(output);
     FileOutputFormat.setOutputPath(conf, outPath);
-    
+
     conf.setMapperClass(FuzzyKMeansClusterMapper.class);
-    
+
     conf.setInputFormat(SequenceFileInputFormat.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
-    
+
     // uncomment it to run locally
     // conf.set("mapred.job.tracker", "local");
     conf.setNumMapTasks(numMapTasks);
@@ -366,13 +366,15 @@ public final class FuzzyKMeansDriver {
     conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
     conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
+    conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
+    conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(threshold));
     try {
       JobClient.runJob(conf);
     } catch (IOException e) {
       log.warn(e.toString(), e);
     }
   }
-  
+
   /**
    * Return if all of the Clusters in the filePath have converged or not
    * 
@@ -387,29 +389,28 @@ public final class FuzzyKMeansDriver {
    *           if there was an IO error
    */
   private static boolean isConverged(String filePath, Configuration conf, FileSystem fs) throws IOException {
-    
+
     Path clusterPath = new Path(filePath + "/*");
     List<Path> result = new ArrayList<Path>();
-    
+
     PathFilter clusterFileFilter = new PathFilter() {
       @Override
       public boolean accept(Path path) {
         return path.getName().startsWith("part");
       }
     };
-    
-    FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, clusterFileFilter)),
-      clusterFileFilter);
-    
+
+    FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, clusterFileFilter)), clusterFileFilter);
+
     for (FileStatus match : matches) {
       result.add(fs.makeQualified(match.getPath()));
     }
     boolean converged = true;
-    
+
     for (Path p : result) {
-      
+
       SequenceFile.Reader reader = null;
-      
+
       try {
         reader = new SequenceFile.Reader(fs, p, conf);
         /*
@@ -427,7 +428,7 @@ public final class FuzzyKMeansDriver {
         }
       }
     }
-    
+
     return converged;
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Fri Apr 30 20:16:56 2010
@@ -209,7 +209,7 @@ public final class KMeansDriver {
     while (!converged && (iteration <= maxIterations)) {
       log.info("Iteration {}", iteration);
       // point the output to a new directory per iteration
-      String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
+      String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
       converged = runIteration(input, clustersIn, clustersOut, measureClass, delta, numReduceTasks, iteration);
       // now point the input to the old output directory
       clustersIn = clustersOut;
@@ -217,7 +217,7 @@ public final class KMeansDriver {
     }
     // now actually cluster the points
     log.info("Clustering ");
-    runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR, measureClass, delta);
+    runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, measureClass, delta);
   }
   
   /**

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java Fri Apr 30 20:16:56 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.ClusterBase;
 import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -149,7 +150,7 @@ public class MeanShiftCanopyJob {
     }
     fs.mkdirs(outPath);
 
-    String clustersIn = output + ClusterBase.INITIAL_CLUSTERS_DIR;
+    String clustersIn = output + Cluster.INITIAL_CLUSTERS_DIR;
     if (inputIsCanopies) {
       clustersIn = input;
     } else {
@@ -162,7 +163,7 @@ public class MeanShiftCanopyJob {
     while (!converged && (iteration <= maxIterations)) {
       log.info("Iteration {}", iteration);
       // point the output to a new directory per iteration
-      String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
+      String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
       String controlOut = output + CONTROL_CONVERGED;
       MeanShiftCanopyDriver.runJob(clustersIn, clustersOut, controlOut, measureClassName, t1, t2, convergenceDelta);
       converged = FileSystem.get(conf).exists(new Path(controlOut));
@@ -172,8 +173,8 @@ public class MeanShiftCanopyJob {
     }
 
     // now cluster the points
-    MeanShiftCanopyDriver.runClustering((inputIsCanopies ? input : output + ClusterBase.INITIAL_CLUSTERS_DIR), clustersIn, output
-        + ClusterBase.CLUSTERED_POINTS_DIR);
+    MeanShiftCanopyDriver.runClustering((inputIsCanopies ? input : output + Cluster.INITIAL_CLUSTERS_DIR), clustersIn, output
+        + Cluster.CLUSTERED_POINTS_DIR);
   }
 
 }