You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/04/30 22:16:57 UTC
svn commit: r939800 [1/2] - in /lucene/mahout/trunk: core/
core/src/main/java/org/apache/mahout/clustering/
core/src/main/java/org/apache/mahout/clustering/canopy/
core/src/main/java/org/apache/mahout/clustering/dirichlet/
core/src/main/java/org/apache...
Author: jeastman
Date: Fri Apr 30 20:16:56 2010
New Revision: 939800
URL: http://svn.apache.org/viewvc?rev=939800&view=rev
Log:
MAHOUT-236:
- moved default file directory definitions to Cluster from ClusterBase
- refactored CanopyClusteringJob and ClusterDriver into CanopyDriver & removed both
- added runClustering option to CanopyDriver.runJob
- refactored DirichletJob into DirichletDriver
- added runClustering option to DirichletDriver.runJob
- added emitMostLikely and threshold options to DirichletDriver.runJob
- implemented emitMostLikelyCluster and emitAllClusters (above threshold)
- refactored FuzzyKMeansJob into FuzzyKMeanstDriver
- added runClustering option to FuzzyKMeanstDriver.runJob
- added emitMostLikely and threshold options to FuzzyKMeanstDriver.runJob
- implemented emitMostLikelyCluster and emitAllClusters (above threshold)
- enhanced TestClusterDumper to utilize term dictionary and enhanced printouts
- updated other unit tests, all of which run
Removed:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
Modified:
lucene/mahout/trunk/core/ (props changed)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java
lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
Propchange: lucene/mahout/trunk/core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Apr 30 20:16:56 2010
@@ -12,3 +12,4 @@ target
.checkstyle
.pmd
.ruleset
+testdatatestdata
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/Cluster.java Fri Apr 30 20:16:56 2010
@@ -24,6 +24,15 @@ import org.apache.mahout.math.Vector;
*/
public interface Cluster {
+ // default directory for all clustered points
+ String CLUSTERED_POINTS_DIR = "/clusteredPoints";
+
+ // default directory for initial clusters to prime iterative clustering algorithms
+ String INITIAL_CLUSTERS_DIR = "/clusters-0";
+
+ // default directory for output of clusters per iteration
+ String CLUSTERS_DIR = "/clusters-";
+
/**
* Get the id of the Cluster
*
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/ClusterBase.java Fri Apr 30 20:16:56 2010
@@ -41,15 +41,6 @@ import com.google.gson.reflect.TypeToken
*/
public abstract class ClusterBase implements Writable, Cluster {
- // default directory for all clustered points
- public static final String CLUSTERED_POINTS_DIR = "/clusteredPoints";
-
- // default directory for initial clusters to prime iterative clustering algorithms
- public static final String INITIAL_CLUSTERS_DIR = "/clusters-0";
-
- // default directory for output of clusters per iteration
- public static final String CLUSTERS_DIR = "/clusters-";
-
// this cluster's clusterId
private int id;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Fri Apr 30 20:16:56 2010
@@ -30,6 +30,7 @@ import org.apache.commons.cli2.commandli
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -37,6 +38,9 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
import org.apache.mahout.math.VectorWritable;
@@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory;
public final class CanopyDriver {
private static final Logger log = LoggerFactory.getLogger(CanopyDriver.class);
+ public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "/clusteredPoints";
private CanopyDriver() { }
@@ -107,7 +112,7 @@ public final class CanopyDriver {
double t1 = Double.parseDouble(cmdLine.getValue(t1Opt).toString());
double t2 = Double.parseDouble(cmdLine.getValue(t2Opt).toString());
- runJob(input, output, measureClass, t1, t2);
+ runJob(input, output, measureClass, t1, t2, false);
} catch (OptionException e) {
log.error("Exception", e);
CommandLineUtil.printHelp(group);
@@ -128,9 +133,11 @@ public final class CanopyDriver {
* the T1 distance threshold
* @param t2
* the T2 distance threshold
+ * @param runClustering
+ * true if points are to be clustered after clusters are determined
*/
public static void runJob(String input, String output,
- String measureClassName, double t1, double t2) throws IOException {
+ String measureClassName, double t1, double t2, boolean runClustering) throws IOException {
log.info("Input: {} Out: {} "
+ "Measure: {} t1: {} t2: {}", new Object[] {input, output, measureClassName, t1, t2});
Configurable client = new JobClient();
@@ -147,7 +154,9 @@ public final class CanopyDriver {
conf.setOutputValueClass(Canopy.class);
FileInputFormat.setInputPaths(conf, new Path(input));
- Path outPath = new Path(output);
+
+ String canopyOutputDir = output + Cluster.CLUSTERS_DIR + "0";
+ Path outPath = new Path(canopyOutputDir);
FileOutputFormat.setOutputPath(conf, outPath);
conf.setMapperClass(CanopyMapper.class);
@@ -161,6 +170,61 @@ public final class CanopyDriver {
dfs.delete(outPath, true);
}
JobClient.runJob(conf);
+
+ if (runClustering){
+ runClustering(input, canopyOutputDir, output, measureClassName, t1, t2);
+ }
+ }
+
+ /**
+ * Run the job
+ *
+ * @param points
+ * the input points directory pathname String
+ * @param canopies
+ * the input canopies directory pathname String
+ * @param output
+ * the output directory pathname String
+ * @param measureClassName
+ * the DistanceMeasure class name
+ * @param t1
+ * the T1 distance threshold
+ * @param t2
+ * the T2 distance threshold
+ */
+ public static void runClustering(String points,
+ String canopies,
+ String output,
+ String measureClassName,
+ double t1,
+ double t2) throws IOException {
+ Configurable client = new JobClient();
+ JobConf conf = new JobConf(CanopyDriver.class);
+
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
+ conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies);
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputKeyClass(IntWritable.class);
+ conf.setOutputValueClass(WeightedVectorWritable.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(points));
+ Path outPath = new Path(output + DEFAULT_CLUSTERED_POINTS_DIRECTORY);
+ FileOutputFormat.setOutputPath(conf, outPath);
+
+ conf.setMapperClass(ClusterMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ conf.setNumReduceTasks(0);
+
+ client.setConf(conf);
+ FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
+ if (dfs.exists(outPath)) {
+ dfs.delete(outPath, true);
+ }
+ JobClient.runJob(conf);
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletCluster.java Fri Apr 30 20:16:56 2010
@@ -105,7 +105,7 @@ public class DirichletCluster<O> impleme
@Override
public String asFormatString(String[] bindings) {
- return "C-" + model.getId() + ": " + model.toString();
+ return "C-" + model.getId() + ": " + model.asFormatString(bindings);
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterMapper.java Fri Apr 30 20:16:56 2010
@@ -35,8 +35,8 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.clustering.ClusterBase;
import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class DirichletClusterMapper extends MapReduceBase implements
@@ -46,21 +46,13 @@ public class DirichletClusterMapper exte
private List<DirichletCluster> clusters;
+ private DirichletClusterer clusterer;
+
@SuppressWarnings("unchecked")
@Override
public void map(WritableComparable<?> key, VectorWritable vector, OutputCollector<IntWritable, WeightedVectorWritable> output,
Reporter reporter) throws IOException {
- int clusterId = -1;
- double clusterPdf = 0;
- for (int i = 0; i < clusters.size(); i++) {
- double pdf = clusters.get(i).getModel().pdf(vector);
- if (pdf > clusterPdf) {
- clusterId = i;
- clusterPdf = pdf;
- }
- }
- //System.out.println(clusterId + ": " + ClusterBase.formatVector(vector.get(), null));
- output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, vector));
+ clusterer.emitPointToClusters(vector, clusters, output);
}
@Override
@@ -68,6 +60,9 @@ public class DirichletClusterMapper exte
super.configure(job);
try {
clusters = getClusters(job);
+ String emitMostLikely = job.get(DirichletDriver.EMIT_MOST_LIKELY_KEY);
+ String threshold = job.get(DirichletDriver.THRESHOLD_KEY);
+ clusterer = new DirichletClusterer<Vector>(Boolean.parseBoolean(emitMostLikely), Double.parseDouble(threshold));
} catch (SecurityException e) {
throw new IllegalStateException(e);
} catch (IllegalArgumentException e) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletClusterer.java Fri Apr 30 20:16:56 2010
@@ -17,13 +17,18 @@
package org.apache.mahout.clustering.dirichlet;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.dirichlet.models.Model;
import org.apache.mahout.clustering.dirichlet.models.ModelDistribution;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.function.TimesFunction;
/**
@@ -80,24 +85,28 @@ import org.apache.mahout.math.function.T
* </pre>
*/
public class DirichletClusterer<O> {
-
+
// observed data
private final List<O> sampleData;
-
+
// the ModelDistribution for the computation
private final ModelDistribution<O> modelFactory;
-
+
// the state of the clustering process
private final DirichletState<O> state;
-
+
private final int thin;
-
+
private final int burnin;
-
+
private final int numClusters;
-
+
private final List<Model<O>[]> clusterSamples = new ArrayList<Model<O>[]>();
-
+
+ private boolean emitMostLikely;
+
+ private double threshold;
+
/**
* Create a new instance on the sample data with the given additional parameters
*
@@ -114,12 +123,8 @@ public class DirichletClusterer<O> {
* @param burnin
* the int burnin interval, used to suppress early iterations
*/
- public DirichletClusterer(List<O> sampleData,
- ModelDistribution<O> modelFactory,
- double alpha_0,
- int numClusters,
- int thin,
- int burnin) {
+ public DirichletClusterer(List<O> sampleData, ModelDistribution<O> modelFactory, double alpha_0, int numClusters, int thin,
+ int burnin) {
this.sampleData = sampleData;
this.modelFactory = modelFactory;
this.thin = thin;
@@ -127,7 +132,24 @@ public class DirichletClusterer<O> {
this.numClusters = numClusters;
state = new DirichletState<O>(modelFactory, numClusters, alpha_0);
}
-
+
+ /**
+ * This constructor only used by DirichletClusterMapper for setting up clustering params
+ * @param emitMostLikely
+ * @param threshold
+ */
+ public DirichletClusterer(boolean emitMostLikely, double threshold) {
+ super();
+ this.sampleData = null;
+ this.modelFactory = null;
+ this.thin = 0;
+ this.burnin = 0;
+ this.numClusters = 0;
+ this.state = null;
+ this.emitMostLikely = emitMostLikely;
+ this.threshold = threshold;
+ }
+
/**
* Iterate over the sample data, obtaining cluster samples periodically and returning them.
*
@@ -141,7 +163,7 @@ public class DirichletClusterer<O> {
}
return clusterSamples;
}
-
+
/**
* Perform one iteration of the clustering process, iterating over the samples to build a new array of
* models, then updating the state for the next iteration
@@ -150,10 +172,10 @@ public class DirichletClusterer<O> {
* the DirichletState<Observation> of this iteration
*/
private void iterate(int iteration, DirichletState<O> state) {
-
+
// create new posterior models
Model<O>[] newModels = modelFactory.sampleFromPosterior(state.getModels());
-
+
// iterate over the samples, assigning each to a model
for (O x : sampleData) {
// compute normalized vector of probabilities that x is described by each model
@@ -164,7 +186,7 @@ public class DirichletClusterer<O> {
// ask the selected model to observe the datum
newModels[k].observe(x);
}
-
+
// periodically add models to the cluster samples after the burn-in period
if ((iteration >= burnin) && (iteration % thin == 0)) {
clusterSamples.add(newModels);
@@ -172,7 +194,7 @@ public class DirichletClusterer<O> {
// update the state from the new models
state.update(newModels);
}
-
+
/**
* Compute a normalized vector of probabilities that x is described by each model using the mixture and the
* model pdfs
@@ -197,7 +219,54 @@ public class DirichletClusterer<O> {
pi.assign(new TimesFunction(), 1.0 / max);
return pi;
}
-
+
+ public void emitPointToClusters(VectorWritable point, List<DirichletCluster> clusters,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ Vector pi = new DenseVector(clusters.size());
+ for (int i = 0; i < clusters.size(); i++) {
+ pi.set(i, clusters.get(i).getModel().pdf(point));
+ }
+ pi = pi.divide(pi.zSum());
+ if (emitMostLikely) {
+ emitMostLikelyCluster(point, clusters, pi, output);
+ } else {
+ emitAllClusters(point, clusters, pi, output);
+ }
+ }
+
+ /**
+ * Emit the point to the most likely cluster
+ * @param point
+ * @param pi the normalized pdf Vector for the point
+ * @param output
+ * @throws IOException
+ */
+ void emitMostLikelyCluster(VectorWritable point, List<DirichletCluster> clusters, Vector pi,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ int clusterId = -1;
+ double clusterPdf = 0;
+ for (int i = 0; i < clusters.size(); i++) {
+ double pdf = pi.get(i);
+ if (pdf > clusterPdf) {
+ clusterId = i;
+ clusterPdf = pdf;
+ }
+ }
+ //System.out.println(clusterId + ": " + ClusterBase.formatVector(vector.get(), null));
+ output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, point));
+ }
+
+ void emitAllClusters(VectorWritable point, List<DirichletCluster> clusters, Vector pi,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ for (int i = 0; i < clusters.size(); i++) {
+ double pdf = pi.get(i);
+ if (pdf > threshold && clusters.get(i).getTotalCount() > 0) {
+ //System.out.println(i + ": " + ClusterBase.formatVector(vector.get(), null));
+ output.collect(new IntWritable(i), new WeightedVectorWritable(pdf, point));
+ }
+ }
+ }
+
/**
* Create a new instance on the sample data with the given additional parameters
*
@@ -216,16 +285,10 @@ public class DirichletClusterer<O> {
* @param numIterations
* number of iterations to be performed
*/
- public static List<Model<Vector>[]> clusterPoints(List<Vector> points,
- ModelDistribution<Vector> modelFactory,
- double alpha_0,
- int numClusters,
- int thin,
- int burnin,
- int numIterations) {
- DirichletClusterer<Vector> clusterer = new DirichletClusterer<Vector>(points, modelFactory, alpha_0,
- numClusters, thin, burnin);
+ public static List<Model<Vector>[]> clusterPoints(List<Vector> points, ModelDistribution<Vector> modelFactory, double alpha_0,
+ int numClusters, int thin, int burnin, int numIterations) {
+ DirichletClusterer<Vector> clusterer = new DirichletClusterer<Vector>(points, modelFactory, alpha_0, numClusters, thin, burnin);
return clusterer.cluster(numIterations);
-
+
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java Fri Apr 30 20:16:56 2010
@@ -41,7 +41,7 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.dirichlet.models.VectorModelDistribution;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
@@ -53,62 +53,74 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirichletDriver {
-
public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
-
+
public static final String MODEL_FACTORY_KEY = "org.apache.mahout.clustering.dirichlet.modelFactory";
-
+
public static final String MODEL_PROTOTYPE_KEY = "org.apache.mahout.clustering.dirichlet.modelPrototype";
-
+
public static final String PROTOTYPE_SIZE_KEY = "org.apache.mahout.clustering.dirichlet.prototypeSize";
-
+
public static final String NUM_CLUSTERS_KEY = "org.apache.mahout.clustering.dirichlet.numClusters";
-
+
public static final String ALPHA_0_KEY = "org.apache.mahout.clustering.dirichlet.alpha_0";
-
+
+ public static final String EMIT_MOST_LIKELY_KEY = "org.apache.mahout.clustering.dirichlet.emitMostLikely";
+
+ public static final String THRESHOLD_KEY = "org.apache.mahout.clustering.dirichlet.threshold";
+
private static final Logger log = LoggerFactory.getLogger(DirichletDriver.class);
-
- private DirichletDriver() {}
-
+
+ private DirichletDriver() {
+ }
+
public static void main(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
-
+
Option inputOpt = DefaultOptionCreator.inputOption().create();
Option outputOpt = DefaultOptionCreator.outputOption().create();
Option maxIterOpt = DefaultOptionCreator.maxIterOption().create();
Option topicsOpt = DefaultOptionCreator.kOption().create();
Option helpOpt = DefaultOptionCreator.helpOption();
-
+
Option mOpt = obuilder.withLongName("alpha").withRequired(true).withShortName("m").withArgument(
- abuilder.withName("alpha").withMinimum(1).withMaximum(1).create()).withDescription(
- "The alpha0 value for the DirichletDistribution.").create();
-
+ abuilder.withName("alpha").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The alpha0 value for the DirichletDistribution.").create();
+
Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(
- abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
- "The ModelDistribution class name. "
- + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution").create();
-
- Option prototypeOpt = obuilder.withLongName("modelPrototypeClass").withRequired(false).withShortName("p")
- .withArgument(abuilder.withName("prototypeClass").withMinimum(1).withMaximum(1).create())
- .withDescription(
- "The ModelDistribution prototype Vector class name. "
- + "Defaults to org.apache.mahout.math.RandomAccessSparseVector").create();
-
- Option sizeOpt = obuilder.withLongName("prototypeSize").withRequired(true).withShortName("s")
- .withArgument(abuilder.withName("prototypeSize").withMinimum(1).withMaximum(1).create())
- .withDescription("The ModelDistribution prototype Vector size. ").create();
-
+ abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The ModelDistribution class name. " + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution")
+ .create();
+
+ Option prototypeOpt = obuilder.withLongName("modelPrototypeClass").withRequired(false).withShortName("p").withArgument(
+ abuilder.withName("prototypeClass").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The ModelDistribution prototype Vector class name. " + "Defaults to org.apache.mahout.math.RandomAccessSparseVector")
+ .create();
+
+ Option sizeOpt = obuilder.withLongName("prototypeSize").withRequired(true).withShortName("s").withArgument(
+ abuilder.withName("prototypeSize").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The ModelDistribution prototype Vector size. ").create();
+
Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(
- abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription(
- "The number of reduce tasks.").create();
-
- Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt)
- .withOption(modelOpt).withOption(prototypeOpt).withOption(sizeOpt).withOption(maxIterOpt).withOption(
- mOpt).withOption(topicsOpt).withOption(helpOpt).withOption(numRedOpt).create();
-
+ abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks.").create();
+
+ Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
+ "If true, run clustering after the iterations have taken place").withShortName("cl").create();
+
+ Option emitMostLikelyOpt = obuilder.withLongName("emitMostLikely").withRequired(false).withShortName("e").withArgument(
+ abuilder.withName("emitMostLikely").withMinimum(1).withMaximum(1).create()).withDescription(
+ "True if clustering emits most likely point only, false for threshold clustering").create();
+
+ Option thresholdOpt = obuilder.withLongName("threshold").withRequired(false).withShortName("t").withArgument(
+ abuilder.withName("threshold").withMinimum(1).withMaximum(1).create()).withDescription("The pdf threshold").create();
+
+ Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt).withOption(modelOpt).withOption(
+ prototypeOpt).withOption(sizeOpt).withOption(maxIterOpt).withOption(mOpt).withOption(topicsOpt).withOption(helpOpt)
+ .withOption(numRedOpt).withOption(clusteringOpt).withOption(emitMostLikelyOpt).withOption(thresholdOpt).create();
+
try {
Parser parser = new Parser();
parser.setGroup(group);
@@ -117,7 +129,7 @@ public class DirichletDriver {
CommandLineUtil.printHelp(group);
return;
}
-
+
String input = cmdLine.getValue(inputOpt).toString();
String output = cmdLine.getValue(outputOpt).toString();
String modelFactory = "org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution";
@@ -132,15 +144,27 @@ public class DirichletDriver {
int numReducers = Integer.parseInt(cmdLine.getValue(numRedOpt).toString());
int numModels = Integer.parseInt(cmdLine.getValue(topicsOpt).toString());
int maxIterations = Integer.parseInt(cmdLine.getValue(maxIterOpt).toString());
+ boolean runClustering = true;
+ if (cmdLine.hasOption(clusteringOpt)) {
+ runClustering = Boolean.parseBoolean(cmdLine.getValue(clusteringOpt).toString());
+ }
+ boolean emitMostLikely = true;
+ if (cmdLine.hasOption(emitMostLikelyOpt)) {
+ emitMostLikely = Boolean.parseBoolean(cmdLine.getValue(emitMostLikelyOpt).toString());
+ }
+ double threshold = 0;
+ if (cmdLine.hasOption(thresholdOpt)) {
+ threshold = Double.parseDouble(cmdLine.getValue(thresholdOpt).toString());
+ }
double alpha_0 = Double.parseDouble(cmdLine.getValue(mOpt).toString());
- runJob(input, output, modelFactory, modelPrototype, prototypeSize, numModels, maxIterations, alpha_0,
- numReducers);
+ runJob(input, output, modelFactory, modelPrototype, prototypeSize, numModels, maxIterations, alpha_0, numReducers,
+ runClustering, emitMostLikely, threshold);
} catch (OptionException e) {
log.error("Exception parsing command line: ", e);
CommandLineUtil.printHelp(group);
}
}
-
+
/**
* Run the job using supplied arguments
*
@@ -161,23 +185,13 @@ public class DirichletDriver {
* @deprecated since it presumes 2-d, dense vector model prototypes
*/
@Deprecated
- public static void runJob(String input,
- String output,
- String modelFactory,
- int numClusters,
- int maxIterations,
- double alpha_0,
- int numReducers) throws ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- IOException,
- SecurityException,
- NoSuchMethodException,
- InvocationTargetException {
- runJob(input, output, modelFactory, "org.apache.mahout.math.DenseVector", 2, numClusters, maxIterations,
- alpha_0, numReducers);
+ public static void runJob(String input, String output, String modelFactory, int numClusters, int maxIterations, double alpha_0,
+ int numReducers) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException,
+ SecurityException, NoSuchMethodException, InvocationTargetException {
+ runJob(input, output, modelFactory, "org.apache.mahout.math.DenseVector", 2, numClusters, maxIterations, alpha_0, numReducers,
+ false, true, 0);
}
-
+
/**
* Run the job using supplied arguments
*
@@ -199,55 +213,40 @@ public class DirichletDriver {
* the alpha_0 value for the DirichletDistribution
* @param numReducers
* the number of Reducers desired
+ * @param runClustering
+ * true if clustering of points to be done after iterations
+ * @param emitMostLikely
+ * a boolean if true emit only most likely cluster for each point
+ * @param threshold
+ * a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
*/
- public static void runJob(String input,
- String output,
- String modelFactory,
- String modelPrototype,
- int prototypeSize,
- int numClusters,
- int maxIterations,
- double alpha_0,
- int numReducers) throws ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- IOException,
- SecurityException,
- NoSuchMethodException,
- InvocationTargetException {
-
- String clustersIn = output + ClusterBase.INITIAL_CLUSTERS_DIR;
+ public static void runJob(String input, String output, String modelFactory, String modelPrototype, int prototypeSize,
+ int numClusters, int maxIterations, double alpha_0, int numReducers, boolean runClustering, boolean emitMostLikely,
+ double threshold) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException,
+ SecurityException, NoSuchMethodException, InvocationTargetException {
+
+ String clustersIn = output + Cluster.INITIAL_CLUSTERS_DIR;
writeInitialState(output, clustersIn, modelFactory, modelPrototype, prototypeSize, numClusters, alpha_0);
-
+
for (int iteration = 1; iteration <= maxIterations; iteration++) {
log.info("Iteration {}", iteration);
// point the output to a new directory per iteration
- String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
- runIteration(input, clustersIn, clustersOut, modelFactory, modelPrototype, prototypeSize, numClusters,
- alpha_0, numReducers);
+ String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
+ runIteration(input, clustersIn, clustersOut, modelFactory, modelPrototype, prototypeSize, numClusters, alpha_0, numReducers);
// now point the input to the old output directory
clustersIn = clustersOut;
}
- // now cluster the most likely points
- runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR);
+ if (runClustering) {
+ // now cluster the most likely points
+ runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, emitMostLikely, threshold);
+ }
}
-
- private static void writeInitialState(String output,
- String stateIn,
- String modelFactory,
- String modelPrototype,
- int prototypeSize,
- int numModels,
- double alpha_0) throws ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- IOException,
- SecurityException,
- NoSuchMethodException,
- InvocationTargetException {
-
- DirichletState<VectorWritable> state = createState(modelFactory, modelPrototype, prototypeSize,
- numModels, alpha_0);
+
+ private static void writeInitialState(String output, String stateIn, String modelFactory, String modelPrototype,
+ int prototypeSize, int numModels, double alpha_0) throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException, IOException, SecurityException, NoSuchMethodException, InvocationTargetException {
+
+ DirichletState<VectorWritable> state = createState(modelFactory, modelPrototype, prototypeSize, numModels, alpha_0);
JobConf job = new JobConf(KMeansDriver.class);
Path outPath = new Path(output);
FileSystem fs = FileSystem.get(outPath.toUri(), job);
@@ -259,7 +258,7 @@ public class DirichletDriver {
writer.close();
}
}
-
+
/**
* Creates a DirichletState object from the given arguments. Note that the modelFactory is presumed to be a
* subclass of VectorModelDistribution that can be initialized with a concrete Vector prototype.
@@ -276,29 +275,20 @@ public class DirichletDriver {
* the double alpha_0 argument to the algorithm
* @return an initialized DirichletState
*/
- public static DirichletState<VectorWritable> createState(String modelFactory,
- String modelPrototype,
- int prototypeSize,
- int numModels,
- double alpha_0) throws ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- SecurityException,
- NoSuchMethodException,
- IllegalArgumentException,
- InvocationTargetException {
-
+ public static DirichletState<VectorWritable> createState(String modelFactory, String modelPrototype, int prototypeSize,
+ int numModels, double alpha_0) throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ SecurityException, NoSuchMethodException, IllegalArgumentException, InvocationTargetException {
+
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- Class<? extends VectorModelDistribution> cl = ccl.loadClass(modelFactory).asSubclass(
- VectorModelDistribution.class);
+ Class<? extends VectorModelDistribution> cl = ccl.loadClass(modelFactory).asSubclass(VectorModelDistribution.class);
VectorModelDistribution factory = cl.newInstance();
-
+
Class<? extends Vector> vcl = ccl.loadClass(modelPrototype).asSubclass(Vector.class);
Constructor<? extends Vector> v = vcl.getConstructor(int.class);
factory.setModelPrototype(new VectorWritable(v.newInstance(prototypeSize)));
return new DirichletState<VectorWritable>(factory, numModels, alpha_0);
}
-
+
/**
* Run the job using supplied arguments
*
@@ -321,27 +311,20 @@ public class DirichletDriver {
* @param numReducers
* the number of Reducers desired
*/
- public static void runIteration(String input,
- String stateIn,
- String stateOut,
- String modelFactory,
- String modelPrototype,
- int prototypeSize,
- int numClusters,
- double alpha_0,
- int numReducers) {
+ public static void runIteration(String input, String stateIn, String stateOut, String modelFactory, String modelPrototype,
+ int prototypeSize, int numClusters, double alpha_0, int numReducers) {
Configurable client = new JobClient();
JobConf conf = new JobConf(DirichletDriver.class);
-
+
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(DirichletCluster.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(VectorWritable.class);
-
+
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(stateOut);
FileOutputFormat.setOutputPath(conf, outPath);
-
+
conf.setMapperClass(DirichletMapper.class);
conf.setReducerClass(DirichletReducer.class);
conf.setNumReduceTasks(numReducers);
@@ -353,7 +336,7 @@ public class DirichletDriver {
conf.set(PROTOTYPE_SIZE_KEY, Integer.toString(prototypeSize));
conf.set(NUM_CLUSTERS_KEY, Integer.toString(numClusters));
conf.set(ALPHA_0_KEY, Double.toString(alpha_0));
-
+
client.setConf(conf);
try {
JobClient.runJob(conf);
@@ -361,7 +344,7 @@ public class DirichletDriver {
log.warn(e.toString(), e);
}
}
-
+
/**
* Run the job using supplied arguments
*
@@ -371,27 +354,33 @@ public class DirichletDriver {
* the directory pathname for input state
* @param output
* the directory pathname for output points
+ * @param emitMostLikely
+ * a boolean if true emit only most likely cluster for each point
+ * @param threshold
+ * a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
*/
- public static void runClustering(String input, String stateIn, String output) {
+ public static void runClustering(String input, String stateIn, String output, boolean emitMostLikely, double threshold) {
JobConf conf = new JobConf(DirichletDriver.class);
conf.setJobName("Dirichlet Clustering");
-
+
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(WeightedVectorWritable.class);
-
+
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(conf, outPath);
-
+
conf.setMapperClass(DirichletClusterMapper.class);
-
+
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
-
+
// uncomment it to run locally
// conf.set("mapred.job.tracker", "local");
conf.setNumReduceTasks(0);
conf.set(STATE_IN_KEY, stateIn);
+ conf.set(EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
+ conf.set(THRESHOLD_KEY, Double.toString(threshold));
try {
JobClient.runJob(conf);
} catch (IOException e) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Fri Apr 30 20:16:56 2010
@@ -29,6 +29,9 @@ import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.dirichlet.DirichletDriver;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class FuzzyKMeansClusterMapper extends MapReduceBase implements
@@ -38,10 +41,14 @@ public class FuzzyKMeansClusterMapper ex
private FuzzyKMeansClusterer clusterer;
+ private boolean emitMostLikely = false;
+
+ private double threshold = 0;
+
@Override
public void map(WritableComparable<?> key, VectorWritable point, OutputCollector<IntWritable, WeightedVectorWritable> output,
Reporter reporter) throws IOException {
- clusterer.outputPointWithClusterProbabilities(key.toString(), point.get(), clusters, output);
+ clusterer.emitPointToClusters(point, clusters, output);
}
/**
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java Fri Apr 30 20:16:56 2010
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
@@ -40,15 +41,19 @@ public class FuzzyKMeansClusterer {
private double m = 2.0; // default value
+ private boolean emitMostLikely = true;
+
+ private double threshold = 0;
+
/**
- * Init the fuzzy k-means clusterer with the distance measure to use for comparison.
- *
- * @param measure
- * The distance measure to use for comparing clusters against points.
- * @param convergenceDelta
- * When do we define a cluster to have converged?
- *
- * */
+ * Init the fuzzy k-means clusterer with the distance measure to use for comparison.
+ *
+ * @param measure
+ * The distance measure to use for comparing clusters against points.
+ * @param convergenceDelta
+ * When do we define a cluster to have converged?
+ *
+ * */
public FuzzyKMeansClusterer(DistanceMeasure measure, double convergenceDelta, double m) {
this.measure = measure;
this.convergenceDelta = convergenceDelta;
@@ -74,6 +79,9 @@ public class FuzzyKMeansClusterer {
convergenceDelta = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY));
// nextClusterId = 0;
m = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.M_KEY));
+ emitMostLikely = Boolean.parseBoolean(job.get(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY));
+ threshold = Double.parseDouble(job.get(FuzzyKMeansConfigKeys.THRESHOLD_KEY));
+
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
@@ -114,45 +122,6 @@ public class FuzzyKMeansClusterer {
}
}
- /**
- * Output point with cluster info (Cluster and probability)
- *
- * @param point
- * a point
- * @param clusters
- * a List<SoftCluster> to test
- * @param output
- * the OutputCollector to emit into
- */
- public void outputPointWithClusterProbabilities(String key, Vector point, List<SoftCluster> clusters,
- OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
-
- // calculate point distances for all clusters
- List<Double> clusterDistanceList = new ArrayList<Double>();
- for (SoftCluster cluster : clusters) {
- clusterDistanceList.add(measure.distance(cluster.getCenter(), point));
- }
- // calculate point pdf for all clusters
- List<Double> clusterPdfList = new ArrayList<Double>();
- for (int i = 0; i < clusters.size(); i++) {
- double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
- clusterPdfList.add(probWeight);
- }
- // for now just emit the most likely cluster
- int clusterId = -1;
- double clusterPdf = 0;
- for (int i = 0; i < clusters.size(); i++) {
- // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
- double pdf = clusterPdfList.get(i);
- if (pdf > clusterPdf) {
- clusterId = clusters.get(i).getId();
- clusterPdf = pdf;
- }
- }
- // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
- output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, new VectorWritable(point)));
- }
-
/** Computes the probability of a point belonging to a cluster */
public double computeProbWeight(double clusterDistance, List<Double> clusterDistanceList) {
if (clusterDistance == 0) {
@@ -187,6 +156,71 @@ public class FuzzyKMeansClusterer {
return this.measure;
}
+ public void emitPointToClusters(VectorWritable point, List<SoftCluster> clusters,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ // calculate point distances for all clusters
+ List<Double> clusterDistanceList = new ArrayList<Double>();
+ for (SoftCluster cluster : clusters) {
+ clusterDistanceList.add(getMeasure().distance(cluster.getCenter(), point.get()));
+ }
+ // calculate point pdf for all clusters
+ Vector pi = new DenseVector(clusters.size());
+ for (int i = 0; i < clusters.size(); i++) {
+ double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+ pi.set(i, probWeight);
+ }
+ if (emitMostLikely) {
+ emitMostLikelyCluster(point.get(), clusters, pi, output);
+ } else {
+ emitAllClusters(point.get(), clusters, pi, output);
+ }
+ }
+
+ /**
+ * Emit the point to the cluster with the highest pdf
+ *
+ * @param point
+ * @param clusters
+ * @param clusterPdfList
+ * @param output
+ * @throws IOException
+ */
+ void emitMostLikelyCluster(Vector point, List<SoftCluster> clusters, Vector clusterPdfList,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ int clusterId = -1;
+ double clusterPdf = 0;
+ for (int i = 0; i < clusters.size(); i++) {
+ // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
+ double pdf = clusterPdfList.get(i);
+ if (pdf > clusterPdf) {
+ clusterId = clusters.get(i).getId();
+ clusterPdf = pdf;
+ }
+ }
+ // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+ output.collect(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, new VectorWritable(point)));
+ }
+
+ /**
+ * Emit the point to all clusters
+ *
+ * @param point
+ * @param clusters
+ * @param pi
+ * @param output
+ * @throws IOException
+ */
+ void emitAllClusters(Vector point, List<SoftCluster> clusters, Vector pi,
+ OutputCollector<IntWritable, WeightedVectorWritable> output) throws IOException {
+ for (int i = 0; i < clusters.size(); i++) {
+ double pdf = pi.get(i);
+ if (pdf > threshold) {
+ // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+ output.collect(new IntWritable(i), new WeightedVectorWritable(pdf, new VectorWritable(point)));
+ }
+ }
+ }
+
/**
* This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
* until their centers converge or until the maximum number of iterations is exceeded.
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansConfigKeys.java Fri Apr 30 20:16:56 2010
@@ -18,10 +18,17 @@
package org.apache.mahout.clustering.fuzzykmeans;
public interface FuzzyKMeansConfigKeys {
-
+
String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
String M_KEY = "org.apache.mahout.clustering.fuzzykmeans.m";
-
+
+ String EMIT_MOST_LIKELY_KEY = "org.apache.mahout.clustering.fuzzykmeans.emitMostLikely";
+
+ String THRESHOLD_KEY = "org.apache.mahout.clustering.fuzzykmeans.threshold";
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Fri Apr 30 20:16:56 2010
@@ -44,7 +44,7 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.common.CommandLineUtil;
@@ -54,79 +54,83 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FuzzyKMeansDriver {
-
+
private static final Logger log = LoggerFactory.getLogger(FuzzyKMeansDriver.class);
- private FuzzyKMeansDriver() { }
-
+ private FuzzyKMeansDriver() {
+ }
+
public static void main(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
Option inputOpt = obuilder.withLongName("input").withRequired(true).withArgument(
- abuilder.withName("input").withMinimum(1).withMaximum(1).create()).withDescription(
- "The Path for input Vectors. Must be a SequenceFile of Writable, Vector").withShortName("i").create();
-
+ abuilder.withName("input").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Path for input Vectors. Must be a SequenceFile of Writable, Vector").withShortName("i").create();
+
Option clustersOpt = obuilder.withLongName("clusters").withRequired(true).withArgument(
- abuilder.withName("clusters").withMinimum(1).withMaximum(1).create()).withDescription(
- "The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy. "
- + "If k is also specified, then a random set of vectors will be selected"
- + " and written out to this path first").withShortName("c").create();
-
+ abuilder.withName("clusters").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy. "
+ + "If k is also specified, then a random set of vectors will be selected" + " and written out to this path first")
+ .withShortName("c").create();
+
Option kOpt = obuilder.withLongName("k").withRequired(false).withArgument(
- abuilder.withName("k").withMinimum(1).withMaximum(1).create()).withDescription(
- "The k in k-Means. If specified, then a random selection of k Vectors will be chosen"
- + " as the Centroid and written to the clusters output path.").withShortName("k").create();
-
+ abuilder.withName("k").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The k in k-Means. If specified, then a random selection of k Vectors will be chosen"
+ + " as the Centroid and written to the clusters output path.").withShortName("k").create();
+
Option outputOpt = obuilder.withLongName("output").withRequired(true).withArgument(
- abuilder.withName("output").withMinimum(1).withMaximum(1).create()).withDescription(
- "The Path to put the output in").withShortName("o").create();
-
+ abuilder.withName("output").withMinimum(1).withMaximum(1).create()).withDescription("The Path to put the output in")
+ .withShortName("o").create();
+
Option measureClassOpt = obuilder.withLongName("distance").withRequired(false).withArgument(
- abuilder.withName("distance").withMinimum(1).withMaximum(1).create()).withDescription(
- "The Distance Measure to use. Default is SquaredEuclidean").withShortName("dm").create();
-
+ abuilder.withName("distance").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Distance Measure to use. Default is SquaredEuclidean").withShortName("dm").create();
+
Option convergenceDeltaOpt = obuilder.withLongName("convergence").withRequired(false).withArgument(
- abuilder.withName("convergence").withMinimum(1).withMaximum(1).create()).withDescription(
- "The threshold below which the clusters are considered to be converged. Default is 0.5")
- .withShortName("d").create();
-
+ abuilder.withName("convergence").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The threshold below which the clusters are considered to be converged. Default is 0.5").withShortName("d").create();
+
Option maxIterationsOpt = obuilder.withLongName("max").withRequired(false).withArgument(
- abuilder.withName("max").withMinimum(1).withMaximum(1).create()).withDescription(
- "The maximum number of iterations to perform. Default is 20").withShortName("x").create();
-
+ abuilder.withName("max").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The maximum number of iterations to perform. Default is 20").withShortName("x").create();
+
Option vectorClassOpt = obuilder.withLongName("vectorClass").withRequired(false).withArgument(
- abuilder.withName("vectorClass").withMinimum(1).withMaximum(1).create()).withDescription(
- "The Vector implementation class name. Default is RandomAccessSparseVector.class").withShortName("v")
- .create();
-
- Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
- .create();
-
+ abuilder.withName("vectorClass").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Vector implementation class name. Default is RandomAccessSparseVector.class").withShortName("v").create();
+
+ Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();
+
Option overwriteOutput = obuilder.withLongName("overwrite").withRequired(false).withDescription(
- "If set, overwrite the output directory").withShortName("w").create();
-
- Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
- "If true, run clustering only (assumes the iterations have already taken place").withShortName("l")
- .create();
-
+ "If set, overwrite the output directory").withShortName("w").create();
+
Option mOpt = obuilder.withLongName("m").withRequired(true).withArgument(
- abuilder.withName("m").withMinimum(1).withMaximum(1).create()).withDescription(
- "coefficient normalization factor, must be greater than 1").withShortName("m").create();
-
+ abuilder.withName("m").withMinimum(1).withMaximum(1).create()).withDescription(
+ "coefficient normalization factor, must be greater than 1").withShortName("m").create();
+
Option numReduceTasksOpt = obuilder.withLongName("numReduce").withRequired(false).withArgument(
- abuilder.withName("numReduce").withMinimum(1).withMaximum(1).create()).withDescription(
- "The number of reduce tasks").withShortName("r").create();
-
+ abuilder.withName("numReduce").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks")
+ .withShortName("r").create();
+
Option numMapTasksOpt = obuilder.withLongName("numMap").withRequired(false).withArgument(
- abuilder.withName("numMap").withMinimum(1).withMaximum(1).create()).withDescription(
- "The number of map tasks").withShortName("u").create();
-
- Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(clustersOpt).withOption(
- outputOpt).withOption(measureClassOpt).withOption(convergenceDeltaOpt).withOption(maxIterationsOpt)
- .withOption(kOpt).withOption(mOpt).withOption(vectorClassOpt).withOption(overwriteOutput).withOption(
- helpOpt).create();
-
+ abuilder.withName("numMap").withMinimum(1).withMaximum(1).create()).withDescription("The number of map tasks")
+ .withShortName("u").create();
+
+ Option clusteringOpt = obuilder.withLongName("clustering").withRequired(false).withDescription(
+ "If true, run clustering after the iterations have taken place").withShortName("cl").create();
+
+ Option emitMostLikelyOpt = obuilder.withLongName("emitMostLikely").withRequired(false).withShortName("e").withArgument(
+ abuilder.withName("emitMostLikely").withMinimum(1).withMaximum(1).create()).withDescription(
+ "True if clustering emits most likely point only, false for threshold clustering").create();
+
+ Option thresholdOpt = obuilder.withLongName("threshold").withRequired(false).withShortName("t").withArgument(
+ abuilder.withName("threshold").withMinimum(1).withMaximum(1).create()).withDescription("The pdf threshold").create();
+
+ Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(clustersOpt).withOption(outputOpt).withOption(
+ measureClassOpt).withOption(convergenceDeltaOpt).withOption(maxIterationsOpt).withOption(kOpt).withOption(mOpt).withOption(
+ vectorClassOpt).withOption(overwriteOutput).withOption(helpOpt).withOption(emitMostLikelyOpt).withOption(thresholdOpt)
+ .create();
+
try {
Parser parser = new Parser();
parser.setGroup(group);
@@ -147,50 +151,52 @@ public final class FuzzyKMeansDriver {
convergenceDelta = Double.parseDouble(cmdLine.getValue(convergenceDeltaOpt).toString());
}
float m = Float.parseFloat(cmdLine.getValue(mOpt).toString());
-
+
// Class<? extends Vector> vectorClass = cmdLine.hasOption(vectorClassOpt) == false ?
// RandomAccessSparseVector.class
// : (Class<? extends Vector>) Class.forName(cmdLine.getValue(vectorClassOpt).toString());
-
+
int numReduceTasks = 10;
if (cmdLine.hasOption(numReduceTasksOpt)) {
numReduceTasks = Integer.parseInt(cmdLine.getValue(numReduceTasksOpt).toString());
}
-
+
int numMapTasks = 50;
if (cmdLine.hasOption(numMapTasksOpt)) {
numMapTasks = Integer.parseInt(cmdLine.getValue(numMapTasksOpt).toString());
}
-
+
int maxIterations = 20;
if (cmdLine.hasOption(maxIterationsOpt)) {
maxIterations = Integer.parseInt(cmdLine.getValue(maxIterationsOpt).toString());
}
-
+
if (cmdLine.hasOption(overwriteOutput)) {
HadoopUtil.overwriteOutput(output);
}
-
+
if (cmdLine.hasOption(kOpt)) {
- clusters = RandomSeedGenerator.buildRandom(input, clusters,
- Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
+ clusters = RandomSeedGenerator.buildRandom(input, clusters, Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
}
-
- if (cmdLine.hasOption(clusteringOpt)) {
- runClustering(input, clusters, output, measureClass, convergenceDelta, numMapTasks,
- m);
- } else {
- runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations,
- numMapTasks, numReduceTasks, m);
+
+ boolean emitMostLikely = true;
+ if (cmdLine.hasOption(emitMostLikelyOpt)) {
+ emitMostLikely = Boolean.parseBoolean(cmdLine.getValue(emitMostLikelyOpt).toString());
+ }
+ double threshold = 0;
+ if (cmdLine.hasOption(thresholdOpt)) {
+ threshold = Double.parseDouble(cmdLine.getValue(thresholdOpt).toString());
}
-
+ runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m, cmdLine
+ .hasOption(clusteringOpt), emitMostLikely, threshold);
+
} catch (OptionException e) {
log.error("Exception", e);
CommandLineUtil.printHelp(group);
}
-
+
}
-
+
/**
* Run the job using supplied arguments
*
@@ -213,41 +219,40 @@ public final class FuzzyKMeansDriver {
* @param m
* the fuzzification factor, see
* http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+ * @param runClustering
+ * true if points are to be clustered after iterations complete
+ * @param emitMostLikely
+ * a boolean if true emit only most likely cluster for each point
+ * @param threshold
+ * a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
*/
- public static void runJob(String input,
- String clustersIn,
- String output,
- String measureClass,
- double convergenceDelta,
- int maxIterations,
- int numMapTasks,
- int numReduceTasks,
- float m) {
-
+ public static void runJob(String input, String clustersIn, String output, String measureClass, double convergenceDelta,
+ int maxIterations, int numMapTasks, int numReduceTasks, float m, boolean runClustering, boolean emitMostLikely,
+ double threshold) {
+
boolean converged = false;
int iteration = 1;
-
+
// iterate until the clusters converge
while (!converged && (iteration <= maxIterations)) {
log.info("Iteration {}", iteration);
-
+
// point the output to a new directory per iteration
- String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
- converged = runIteration(input, clustersIn, clustersOut, measureClass,
- convergenceDelta, numMapTasks, numReduceTasks, iteration, m);
-
+ String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
+ converged = runIteration(input, clustersIn, clustersOut, measureClass, convergenceDelta, numMapTasks, numReduceTasks,
+ iteration, m);
+
// now point the input to the old output directory
clustersIn = clustersOut;
iteration++;
}
-
+
// now actually cluster the points
log.info("Clustering ");
-
- runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR, measureClass,
- convergenceDelta, numMapTasks, m);
+ runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, measureClass, convergenceDelta, numMapTasks, m,
+ emitMostLikely, threshold);
}
-
+
/**
* Run the job using supplied arguments
*
@@ -270,45 +275,41 @@ public final class FuzzyKMeansDriver {
* http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
* @return true if the iteration successfully runs
*/
- private static boolean runIteration(String input,
- String clustersIn,
- String clustersOut,
- String measureClass,
- double convergenceDelta,
- int numMapTasks,
- int numReduceTasks,
- int iterationNumber,
- float m) {
-
- JobConf conf = new JobConf(FuzzyKMeansJob.class);
+ private static boolean runIteration(String input, String clustersIn, String clustersOut, String measureClass,
+ double convergenceDelta, int numMapTasks, int numReduceTasks, int iterationNumber, float m) {
+
+ JobConf conf = new JobConf(FuzzyKMeansDriver.class);
conf.setJobName("Fuzzy K Means{" + iterationNumber + '}');
-
+
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(FuzzyKMeansInfo.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(SoftCluster.class);
-
+
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(clustersOut);
FileOutputFormat.setOutputPath(conf, outPath);
-
+
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
-
+
conf.setMapperClass(FuzzyKMeansMapper.class);
conf.setCombinerClass(FuzzyKMeansCombiner.class);
conf.setReducerClass(FuzzyKMeansReducer.class);
conf.setNumMapTasks(numMapTasks);
conf.setNumReduceTasks(numReduceTasks);
-
+
conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn);
conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
-
+ // these values don't matter during iterations as only used for clustering if requested
+ conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(true));
+ conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(0));
+
// uncomment it to run locally
// conf.set("mapred.job.tracker", "local");
-
+
try {
JobClient.runJob(conf);
FileSystem fs = FileSystem.get(outPath.toUri(), conf);
@@ -318,7 +319,7 @@ public final class FuzzyKMeansDriver {
return true;
}
}
-
+
/**
* Run the job using supplied arguments
*
@@ -334,30 +335,29 @@ public final class FuzzyKMeansDriver {
* the convergence delta value
* @param numMapTasks
* the number of map tasks
+ * @param emitMostLikely
+ * a boolean if true emit only most likely cluster for each point
+ * @param threshold
+ * a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
*/
- private static void runClustering(String input,
- String clustersIn,
- String output,
- String measureClass,
- double convergenceDelta,
- int numMapTasks,
- float m) {
-
+ private static void runClustering(String input, String clustersIn, String output, String measureClass, double convergenceDelta,
+ int numMapTasks, float m, boolean emitMostLikely, double threshold) {
+
JobConf conf = new JobConf(FuzzyKMeansDriver.class);
conf.setJobName("Fuzzy K Means Clustering");
-
+
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(WeightedVectorWritable.class);
-
+
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(conf, outPath);
-
+
conf.setMapperClass(FuzzyKMeansClusterMapper.class);
-
+
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
-
+
// uncomment it to run locally
// conf.set("mapred.job.tracker", "local");
conf.setNumMapTasks(numMapTasks);
@@ -366,13 +366,15 @@ public final class FuzzyKMeansDriver {
conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
+ conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));
+ conf.set(FuzzyKMeansConfigKeys.THRESHOLD_KEY, Double.toString(threshold));
try {
JobClient.runJob(conf);
} catch (IOException e) {
log.warn(e.toString(), e);
}
}
-
+
/**
* Return if all of the Clusters in the filePath have converged or not
*
@@ -387,29 +389,28 @@ public final class FuzzyKMeansDriver {
* if there was an IO error
*/
private static boolean isConverged(String filePath, Configuration conf, FileSystem fs) throws IOException {
-
+
Path clusterPath = new Path(filePath + "/*");
List<Path> result = new ArrayList<Path>();
-
+
PathFilter clusterFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith("part");
}
};
-
- FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, clusterFileFilter)),
- clusterFileFilter);
-
+
+ FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, clusterFileFilter)), clusterFileFilter);
+
for (FileStatus match : matches) {
result.add(fs.makeQualified(match.getPath()));
}
boolean converged = true;
-
+
for (Path p : result) {
-
+
SequenceFile.Reader reader = null;
-
+
try {
reader = new SequenceFile.Reader(fs, p, conf);
/*
@@ -427,7 +428,7 @@ public final class FuzzyKMeansDriver {
}
}
}
-
+
return converged;
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Fri Apr 30 20:16:56 2010
@@ -209,7 +209,7 @@ public final class KMeansDriver {
while (!converged && (iteration <= maxIterations)) {
log.info("Iteration {}", iteration);
// point the output to a new directory per iteration
- String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
+ String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
converged = runIteration(input, clustersIn, clustersOut, measureClass, delta, numReduceTasks, iteration);
// now point the input to the old output directory
clustersIn = clustersOut;
@@ -217,7 +217,7 @@ public final class KMeansDriver {
}
// now actually cluster the points
log.info("Clustering ");
- runClustering(input, clustersIn, output + ClusterBase.CLUSTERED_POINTS_DIR, measureClass, delta);
+ runClustering(input, clustersIn, output + Cluster.CLUSTERED_POINTS_DIR, measureClass, delta);
}
/**
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java?rev=939800&r1=939799&r2=939800&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java Fri Apr 30 20:16:56 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.ClusterBase;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -149,7 +150,7 @@ public class MeanShiftCanopyJob {
}
fs.mkdirs(outPath);
- String clustersIn = output + ClusterBase.INITIAL_CLUSTERS_DIR;
+ String clustersIn = output + Cluster.INITIAL_CLUSTERS_DIR;
if (inputIsCanopies) {
clustersIn = input;
} else {
@@ -162,7 +163,7 @@ public class MeanShiftCanopyJob {
while (!converged && (iteration <= maxIterations)) {
log.info("Iteration {}", iteration);
// point the output to a new directory per iteration
- String clustersOut = output + ClusterBase.CLUSTERS_DIR + iteration;
+ String clustersOut = output + Cluster.CLUSTERS_DIR + iteration;
String controlOut = output + CONTROL_CONVERGED;
MeanShiftCanopyDriver.runJob(clustersIn, clustersOut, controlOut, measureClassName, t1, t2, convergenceDelta);
converged = FileSystem.get(conf).exists(new Path(controlOut));
@@ -172,8 +173,8 @@ public class MeanShiftCanopyJob {
}
// now cluster the points
- MeanShiftCanopyDriver.runClustering((inputIsCanopies ? input : output + ClusterBase.INITIAL_CLUSTERS_DIR), clustersIn, output
- + ClusterBase.CLUSTERED_POINTS_DIR);
+ MeanShiftCanopyDriver.runClustering((inputIsCanopies ? input : output + Cluster.INITIAL_CLUSTERS_DIR), clustersIn, output
+ + Cluster.CLUSTERED_POINTS_DIR);
}
}