You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/31 15:28:31 UTC
svn commit: r1307801 -
/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
Author: pranjan
Date: Sat Mar 31 13:28:31 2012
New Revision: 1307801
URL: http://svn.apache.org/viewvc?rev=1307801&view=rev
Log:
Formatted with line length 120
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1307801&r1=1307800&r2=1307801&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Sat Mar 31 13:28:31 2012
@@ -68,10 +68,8 @@ public class ClusterClassificationDriver
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.methodOption().create());
- addOption(DefaultOptionCreator
- .clustersInOption()
- .withDescription(
- "The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy.")
+ addOption(DefaultOptionCreator.clustersInOption()
+ .withDescription("The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy.")
.create());
if (parseArguments(args) == null) {
@@ -84,19 +82,16 @@ public class ClusterClassificationDriver
if (getConf() == null) {
setConf(new Configuration());
}
- Path clustersIn = new Path(
- getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
- boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
- .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+ Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
+ boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
double clusterClassificationThreshold = 0.0;
if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
- clusterClassificationThreshold = Double
- .parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+ clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
}
- run(input, clustersIn, output, clusterClassificationThreshold, true,
- runSequential);
+ run(input, clustersIn, output, clusterClassificationThreshold, true, runSequential);
return 0;
}
@@ -107,8 +102,7 @@ public class ClusterClassificationDriver
private ClusterClassificationDriver() {}
public static void main(String[] args) throws Exception {
- ToolRunner
- .run(new Configuration(), new ClusterClassificationDriver(), args);
+ ToolRunner.run(new Configuration(), new ClusterClassificationDriver(), args);
}
/**
@@ -144,16 +138,12 @@ public class ClusterClassificationDriver
}
- private static void classifyClusterSeq(Configuration conf, Path input,
- Path clusters, Path output, Double clusterClassificationThreshold, boolean emitMostLikely)
- throws IOException {
+ private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output,
+ Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
List<Cluster> clusterModels = populateClusterModels(clusters, conf);
- ClusteringPolicy policy = ClusterClassifier
- .readPolicy(finalClustersPath(clusters));
- ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels,
- policy);
- selectCluster(input, clusterModels, clusterClassifier, output,
- clusterClassificationThreshold, emitMostLikely);
+ ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(clusters));
+ ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy);
+ selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely);
}
@@ -167,31 +157,26 @@ public class ClusterClassificationDriver
* @return The list of clusters found by the clustering.
* @throws IOException
*/
- private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf)
- throws IOException {
+ private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
List<Cluster> clusterModels = new ArrayList<Cluster>();
Cluster cluster = null;
Path finalClustersPath = finalClustersPath(clusterOutputPath);
- Iterator<?> it = new SequenceFileDirValueIterator<Writable>(
- finalClustersPath, PathType.LIST, PathFilters.partFilter(), null,
- false, conf);
+ Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
+ PathFilters.partFilter(), null, false, conf);
while (it.hasNext()) {
- ClusterWritable next = (ClusterWritable)it.next();
+ ClusterWritable next = (ClusterWritable) it.next();
cluster = (Cluster) next.getValue();
- if(cluster instanceof DirichletCluster){
- ((DirichletCluster) cluster).getModel().configure(conf);
+ if (cluster instanceof DirichletCluster) {
+ ((DirichletCluster) cluster).getModel().configure(conf);
}
clusterModels.add(cluster);
}
return clusterModels;
}
- private static Path finalClustersPath(Path clusterOutputPath)
- throws IOException {
- FileSystem fileSystem = clusterOutputPath
- .getFileSystem(new Configuration());
- FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
- PathFilters.finalPartFilter());
+ private static Path finalClustersPath(Path clusterOutputPath) throws IOException {
+ FileSystem fileSystem = clusterOutputPath.getFileSystem(new Configuration());
+ FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
Path finalClustersPath = clusterFiles[0].getPath();
return finalClustersPath;
}
@@ -212,58 +197,47 @@ public class ClusterClassificationDriver
* TODO
* @throws IOException
*/
- private static void selectCluster(Path input, List<Cluster> clusterModels,
- ClusterClassifier clusterClassifier, Path output,
- Double clusterClassificationThreshold, boolean emitMostLikely)
- throws IOException {
+ private static void selectCluster(Path input, List<Cluster> clusterModels, ClusterClassifier clusterClassifier,
+ Path output, Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
Configuration conf = new Configuration();
- SequenceFile.Writer writer = new SequenceFile.Writer(
- input.getFileSystem(conf), conf, new Path(output, "part-m-" + 0),
- IntWritable.class, WeightedVectorWritable.class);
- for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(
- input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+ SequenceFile.Writer writer = new SequenceFile.Writer(input.getFileSystem(conf), conf, new Path(output,
+ "part-m-" + 0), IntWritable.class, WeightedVectorWritable.class);
+ for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(input, PathType.LIST,
+ PathFilters.logsCRCFilter(), conf)) {
Vector pdfPerCluster = clusterClassifier.classify(vw.get());
if (shouldClassify(pdfPerCluster, clusterClassificationThreshold)) {
- classifyAndWrite(clusterModels, clusterClassificationThreshold,
- emitMostLikely, writer, vw, pdfPerCluster);
+ classifyAndWrite(clusterModels, clusterClassificationThreshold, emitMostLikely, writer, vw, pdfPerCluster);
}
}
writer.close();
}
- private static void classifyAndWrite(List<Cluster> clusterModels,
- Double clusterClassificationThreshold, boolean emitMostLikely,
- SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster)
- throws IOException {
+ private static void classifyAndWrite(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+ boolean emitMostLikely, SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
if (emitMostLikely) {
int maxValueIndex = pdfPerCluster.maxValueIndex();
- WeightedVectorWritable wvw = new WeightedVectorWritable(
- pdfPerCluster.maxValue(), vw.get());
+ WeightedVectorWritable wvw = new WeightedVectorWritable(pdfPerCluster.maxValue(), vw.get());
write(clusterModels, writer, wvw, maxValueIndex);
} else {
- writeAllAboveThreshold(clusterModels, clusterClassificationThreshold,
- writer, vw, pdfPerCluster);
+ writeAllAboveThreshold(clusterModels, clusterClassificationThreshold, writer, vw, pdfPerCluster);
}
}
- private static void writeAllAboveThreshold(List<Cluster> clusterModels,
- Double clusterClassificationThreshold, SequenceFile.Writer writer,
- VectorWritable vw, Vector pdfPerCluster) throws IOException {
+ private static void writeAllAboveThreshold(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+ SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
Iterator<Element> iterateNonZero = pdfPerCluster.iterateNonZero();
while (iterateNonZero.hasNext()) {
Element pdf = iterateNonZero.next();
if (pdf.get() >= clusterClassificationThreshold) {
- WeightedVectorWritable wvw = new WeightedVectorWritable(pdf.get(),
- vw.get());
+ WeightedVectorWritable wvw = new WeightedVectorWritable(pdf.get(), vw.get());
int clusterIndex = pdf.index();
write(clusterModels, writer, wvw, clusterIndex);
}
}
}
- private static void write(List<Cluster> clusterModels,
- SequenceFile.Writer writer, WeightedVectorWritable wvw, int maxValueIndex)
- throws IOException {
+ private static void write(List<Cluster> clusterModels, SequenceFile.Writer writer, WeightedVectorWritable wvw,
+ int maxValueIndex) throws IOException {
Cluster cluster = clusterModels.get(maxValueIndex);
writer.append(new IntWritable(cluster.getId()), wvw);
}
@@ -274,24 +248,20 @@ public class ClusterClassificationDriver
*
* @return whether the vector should be classified or not.
*/
- private static boolean shouldClassify(Vector pdfPerCluster,
- Double clusterClassificationThreshold) {
+ private static boolean shouldClassify(Vector pdfPerCluster, Double clusterClassificationThreshold) {
boolean isMaxPDFGreatherThanThreshold = pdfPerCluster.maxValue() >= clusterClassificationThreshold;
return isMaxPDFGreatherThanThreshold;
}
- private static void classifyClusterMR(Configuration conf, Path input,
- Path clustersIn, Path output, Double clusterClassificationThreshold,
- boolean emitMostLikely) throws IOException, InterruptedException,
- ClassNotFoundException {
+ private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
+ Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
+ ClassNotFoundException {
- conf.setFloat(OUTLIER_REMOVAL_THRESHOLD,
- clusterClassificationThreshold.floatValue());
+ conf.setFloat(OUTLIER_REMOVAL_THRESHOLD, clusterClassificationThreshold.floatValue());
conf.setBoolean(EMIT_MOST_LIKELY, emitMostLikely);
conf.set(CLUSTERS_IN, clustersIn.toUri().toString());
- Job job = new Job(conf,
- "Cluster Classification Driver running over input: " + input);
+ Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
job.setJarByClass(ClusterClassificationDriver.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -306,11 +276,10 @@ public class ClusterClassificationDriver
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
- throw new InterruptedException(
- "Cluster Classification Driver Job failed processing " + input);
+ throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
}
}
-
+
public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
InterruptedException, ClassNotFoundException {