You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/31 15:28:31 UTC

svn commit: r1307801 - /mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java

Author: pranjan
Date: Sat Mar 31 13:28:31 2012
New Revision: 1307801

URL: http://svn.apache.org/viewvc?rev=1307801&view=rev
Log:
Formatted with line length 120

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java?rev=1307801&r1=1307800&r2=1307801&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java Sat Mar 31 13:28:31 2012
@@ -68,10 +68,8 @@ public class ClusterClassificationDriver
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.methodOption().create());
-    addOption(DefaultOptionCreator
-        .clustersInOption()
-        .withDescription(
-            "The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.")
+    addOption(DefaultOptionCreator.clustersInOption()
+        .withDescription("The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.")
         .create());
     
     if (parseArguments(args) == null) {
@@ -84,19 +82,16 @@ public class ClusterClassificationDriver
     if (getConf() == null) {
       setConf(new Configuration());
     }
-    Path clustersIn = new Path(
-        getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
-    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
-        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
     
     double clusterClassificationThreshold = 0.0;
     if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
-      clusterClassificationThreshold = Double
-          .parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
     }
     
-    run(input, clustersIn, output, clusterClassificationThreshold, true,
-        runSequential);
+    run(input, clustersIn, output, clusterClassificationThreshold, true, runSequential);
     
     return 0;
   }
@@ -107,8 +102,7 @@ public class ClusterClassificationDriver
   private ClusterClassificationDriver() {}
   
   public static void main(String[] args) throws Exception {
-    ToolRunner
-        .run(new Configuration(), new ClusterClassificationDriver(), args);
+    ToolRunner.run(new Configuration(), new ClusterClassificationDriver(), args);
   }
   
   /**
@@ -144,16 +138,12 @@ public class ClusterClassificationDriver
     
   }
   
-  private static void classifyClusterSeq(Configuration conf, Path input,
-      Path clusters, Path output, Double clusterClassificationThreshold, boolean emitMostLikely)
-      throws IOException {
+  private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output,
+      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
     List<Cluster> clusterModels = populateClusterModels(clusters, conf);
-    ClusteringPolicy policy = ClusterClassifier
-        .readPolicy(finalClustersPath(clusters));
-    ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels,
-        policy);
-    selectCluster(input, clusterModels, clusterClassifier, output,
-        clusterClassificationThreshold, emitMostLikely);
+    ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(clusters));
+    ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy);
+    selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely);
     
   }
   
@@ -167,31 +157,26 @@ public class ClusterClassificationDriver
    * @return The list of clusters found by the clustering.
    * @throws IOException
    */
-  private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf)
-      throws IOException {
+  private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
     List<Cluster> clusterModels = new ArrayList<Cluster>();
     Cluster cluster = null;
     Path finalClustersPath = finalClustersPath(clusterOutputPath);
-    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(
-        finalClustersPath, PathType.LIST, PathFilters.partFilter(), null,
-        false, conf);
+    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
+        PathFilters.partFilter(), null, false, conf);
     while (it.hasNext()) {
-      ClusterWritable next = (ClusterWritable)it.next();
+      ClusterWritable next = (ClusterWritable) it.next();
       cluster = (Cluster) next.getValue();
-      if(cluster instanceof DirichletCluster){
-    	  ((DirichletCluster) cluster).getModel().configure(conf);
+      if (cluster instanceof DirichletCluster) {
+        ((DirichletCluster) cluster).getModel().configure(conf);
       }
       clusterModels.add(cluster);
     }
     return clusterModels;
   }
   
-  private static Path finalClustersPath(Path clusterOutputPath)
-      throws IOException {
-    FileSystem fileSystem = clusterOutputPath
-        .getFileSystem(new Configuration());
-    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
-        PathFilters.finalPartFilter());
+  private static Path finalClustersPath(Path clusterOutputPath) throws IOException {
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(new Configuration());
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
     Path finalClustersPath = clusterFiles[0].getPath();
     return finalClustersPath;
   }
@@ -212,58 +197,47 @@ public class ClusterClassificationDriver
    *          TODO
    * @throws IOException
    */
-  private static void selectCluster(Path input, List<Cluster> clusterModels,
-      ClusterClassifier clusterClassifier, Path output,
-      Double clusterClassificationThreshold, boolean emitMostLikely)
-      throws IOException {
+  private static void selectCluster(Path input, List<Cluster> clusterModels, ClusterClassifier clusterClassifier,
+      Path output, Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
     Configuration conf = new Configuration();
-    SequenceFile.Writer writer = new SequenceFile.Writer(
-        input.getFileSystem(conf), conf, new Path(output, "part-m-" + 0),
-        IntWritable.class, WeightedVectorWritable.class);
-    for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(
-        input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+    SequenceFile.Writer writer = new SequenceFile.Writer(input.getFileSystem(conf), conf, new Path(output,
+        "part-m-" + 0), IntWritable.class, WeightedVectorWritable.class);
+    for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(input, PathType.LIST,
+        PathFilters.logsCRCFilter(), conf)) {
       Vector pdfPerCluster = clusterClassifier.classify(vw.get());
       if (shouldClassify(pdfPerCluster, clusterClassificationThreshold)) {
-        classifyAndWrite(clusterModels, clusterClassificationThreshold,
-            emitMostLikely, writer, vw, pdfPerCluster);
+        classifyAndWrite(clusterModels, clusterClassificationThreshold, emitMostLikely, writer, vw, pdfPerCluster);
       }
     }
     writer.close();
   }
   
-  private static void classifyAndWrite(List<Cluster> clusterModels,
-      Double clusterClassificationThreshold, boolean emitMostLikely,
-      SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster)
-      throws IOException {
+  private static void classifyAndWrite(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+      boolean emitMostLikely, SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
     if (emitMostLikely) {
       int maxValueIndex = pdfPerCluster.maxValueIndex();
-      WeightedVectorWritable wvw = new WeightedVectorWritable(
-          pdfPerCluster.maxValue(), vw.get());
+      WeightedVectorWritable wvw = new WeightedVectorWritable(pdfPerCluster.maxValue(), vw.get());
       write(clusterModels, writer, wvw, maxValueIndex);
     } else {
-      writeAllAboveThreshold(clusterModels, clusterClassificationThreshold,
-          writer, vw, pdfPerCluster);
+      writeAllAboveThreshold(clusterModels, clusterClassificationThreshold, writer, vw, pdfPerCluster);
     }
   }
   
-  private static void writeAllAboveThreshold(List<Cluster> clusterModels,
-      Double clusterClassificationThreshold, SequenceFile.Writer writer,
-      VectorWritable vw, Vector pdfPerCluster) throws IOException {
+  private static void writeAllAboveThreshold(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+      SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
     Iterator<Element> iterateNonZero = pdfPerCluster.iterateNonZero();
     while (iterateNonZero.hasNext()) {
       Element pdf = iterateNonZero.next();
       if (pdf.get() >= clusterClassificationThreshold) {
-        WeightedVectorWritable wvw = new WeightedVectorWritable(pdf.get(),
-            vw.get());
+        WeightedVectorWritable wvw = new WeightedVectorWritable(pdf.get(), vw.get());
         int clusterIndex = pdf.index();
         write(clusterModels, writer, wvw, clusterIndex);
       }
     }
   }
   
-  private static void write(List<Cluster> clusterModels,
-      SequenceFile.Writer writer, WeightedVectorWritable wvw, int maxValueIndex)
-      throws IOException {
+  private static void write(List<Cluster> clusterModels, SequenceFile.Writer writer, WeightedVectorWritable wvw,
+      int maxValueIndex) throws IOException {
     Cluster cluster = clusterModels.get(maxValueIndex);
     writer.append(new IntWritable(cluster.getId()), wvw);
   }
@@ -274,24 +248,20 @@ public class ClusterClassificationDriver
    * 
    * @return whether the vector should be classified or not.
    */
-  private static boolean shouldClassify(Vector pdfPerCluster,
-      Double clusterClassificationThreshold) {
+  private static boolean shouldClassify(Vector pdfPerCluster, Double clusterClassificationThreshold) {
     boolean isMaxPDFGreatherThanThreshold = pdfPerCluster.maxValue() >= clusterClassificationThreshold;
     return isMaxPDFGreatherThanThreshold;
   }
   
-  private static void classifyClusterMR(Configuration conf, Path input,
-      Path clustersIn, Path output, Double clusterClassificationThreshold,
-      boolean emitMostLikely) throws IOException, InterruptedException,
-      ClassNotFoundException {    
+  private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
+      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
+      ClassNotFoundException {
     
-    conf.setFloat(OUTLIER_REMOVAL_THRESHOLD,
-        clusterClassificationThreshold.floatValue());
+    conf.setFloat(OUTLIER_REMOVAL_THRESHOLD, clusterClassificationThreshold.floatValue());
     conf.setBoolean(EMIT_MOST_LIKELY, emitMostLikely);
     conf.set(CLUSTERS_IN, clustersIn.toUri().toString());
     
-    Job job = new Job(conf,
-        "Cluster Classification Driver running over input: " + input);
+    Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
     job.setJarByClass(ClusterClassificationDriver.class);
     
     job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -306,11 +276,10 @@ public class ClusterClassificationDriver
     FileInputFormat.addInputPath(job, input);
     FileOutputFormat.setOutputPath(job, output);
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException(
-          "Cluster Classification Driver Job failed processing " + input);
+      throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
     }
   }
-
+  
   public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
       double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
       InterruptedException, ClassNotFoundException {