You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/08 15:24:18 UTC

svn commit: r1298406 - in /mahout/trunk/core/src: main/java/org/apache/mahout/clustering/canopy/ test/java/org/apache/mahout/clustering/canopy/

Author: pranjan
Date: Thu Mar  8 14:24:18 2012
New Revision: 1298406

URL: http://svn.apache.org/viewvc?rev=1298406&view=rev
Log:
MAHOUT-982, Clustering vectors using ClusterClassificationDriver. Deleted ClusterMapper and its test cases.

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java Thu Mar  8 14:24:18 2012
@@ -17,23 +17,20 @@
 
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.AbstractCluster;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.math.Vector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class CanopyClusterer {
 
   private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class);
@@ -168,32 +165,6 @@ public class CanopyClusterer {
   }
 
   /**
-   * Emit the point to the closest Canopy
-   */
-  public void emitPointToClosestCanopy(Vector point,
-                                       Iterable<Canopy> canopies,
-                                       Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    Canopy closest = findClosestCanopy(point, canopies);
-    context.write(new IntWritable(closest.getId()), new WeightedVectorWritable(1, point));
-    context.setStatus("Emit Closest Canopy ID:" + closest.getIdentifier());
-  }
-
-  protected Canopy findClosestCanopy(Vector point, Iterable<Canopy> canopies) {
-    double minDist = Double.MAX_VALUE;
-    Canopy closest = null;
-    // find closest canopy
-    for (Canopy canopy : canopies) {
-      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
-      if (dist < minDist) {
-        minDist = dist;
-        closest = canopy;
-      }
-    }
-    return closest;
-  }
-
-  /**
    * Return if the point is covered by the canopy
    * 
    * @param point

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Thu Mar  8 14:24:18 2012
@@ -17,19 +17,16 @@
 
 package org.apache.mahout.clustering.canopy;
 
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
 import java.io.IOException;
 import java.util.Collection;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -38,7 +35,9 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.AbstractCluster;
 import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
@@ -51,6 +50,9 @@ import org.apache.mahout.math.VectorWrit
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
 public class CanopyDriver extends AbstractJob {
 
   public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
@@ -354,81 +356,9 @@ public class CanopyDriver extends Abstra
   public static void clusterData(Configuration conf, Path points,
       Path canopies, Path output, DistanceMeasure measure, double t1,
       double t2, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
-    if (runSequential) {
-      clusterDataSeq(points, canopies, output, measure, t1, t2);
-    } else {
-      clusterDataMR(conf, points, canopies, output, measure, t1, t2);
-    }
-  }
-
-  private static void clusterDataSeq(Path points, Path canopies, Path output,
-      DistanceMeasure measure, double t1, double t2) throws IOException {
-    CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
-
-    Collection<Canopy> clusters = Lists.newArrayList();
-    Configuration conf = new Configuration();
-
-    for (Canopy value : new SequenceFileDirValueIterable<Canopy>(canopies,
-        PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
-      clusters.add(value);
-    }
-
-    // iterate over all points, assigning each to the closest canopy and
-    // outputing that clustering
-    FileSystem fs = FileSystem.get(points.toUri(), conf);
-    FileStatus[] status = fs.listStatus(points, PathFilters.logsCRCFilter());
-    Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
-    int part = 0;
-    for (FileStatus s : status) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
-          conf);
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
-          outPath, "part-m-" + part), IntWritable.class,
-          WeightedVectorWritable.class);
-      try {
-        Writable key = ClassUtils.instantiateAs(reader.getKeyClassName(), Writable.class);
-        VectorWritable vw = ClassUtils.instantiateAs(reader.getValueClassName(), VectorWritable.class);
-        while (reader.next(key, vw)) {
-          Canopy closest = clusterer.findClosestCanopy(vw.get(), clusters);
-          writer.append(new IntWritable(closest.getId()),
-              new WeightedVectorWritable(1, vw.get()));
-          vw = ClassUtils.instantiateAs(reader.getValueClassName(), VectorWritable.class);
-        }
-      } finally {
-        Closeables.closeQuietly(reader);
-        Closeables.closeQuietly(writer);
-      }
-    }
-  }
-
-  private static void clusterDataMR(Configuration conf, Path points,
-      Path canopies, Path output, DistanceMeasure measure, double t1, double t2)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
-        .getName());
-    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
-    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
-    conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
-
-    Job job = new Job(conf, "Canopy Driver running clusterData over input: "
-        + points);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setMapperClass(ClusterMapper.class);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(WeightedVectorWritable.class);
-    job.setNumReduceTasks(0);
-    job.setJarByClass(CanopyDriver.class);
-
-    FileInputFormat.addInputPath(job, points);
-    Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
-    FileOutputFormat.setOutputPath(job, outPath);
-    HadoopUtil.delete(conf, outPath);
-
-    if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Clustering failed processing "
-          + canopies);
-    }
+    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(),
+        canopies);
+      ClusterClassificationDriver.run(points, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), 0.0, true, runSequential);
   }
-
+  
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Thu Mar  8 14:24:18 2012
@@ -17,18 +17,12 @@
 
 package org.apache.mahout.clustering.canopy;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.Map.Entry;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -37,7 +31,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.ClusteringTestUtils;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.common.DummyRecordWriter;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.MahoutTestCase;
@@ -52,6 +45,9 @@ import org.apache.mahout.math.VectorWrit
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
 public final class TestCanopyCreation extends MahoutTestCase {
 
   private static final double[][] RAW = { { 1, 1 }, { 2, 1 }, { 1, 2 },
@@ -103,15 +99,6 @@ public final class TestCanopyCreation ex
     }
   }
 
-  private static Canopy findCanopy(Integer key, Iterable<Canopy> canopies) {
-    for (Canopy c : canopies) {
-      if (c.getId() == key) {
-        return c;
-      }
-    }
-    return null;
-  }
-
   @Override
   @Before
   public void setUp() throws Exception {
@@ -395,93 +382,6 @@ public final class TestCanopyCreation ex
     }
   }
 
-  /**
-   * Story: User can cluster a subset of the points using a ClusterMapper and a
-   * ManhattanDistanceMeasure.
-   */
-  @Test
-  public void testClusterMapperManhattan() throws Exception {
-    ClusterMapper mapper = new ClusterMapper();
-    Configuration conf = new Configuration();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
-        "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
-    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
-    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
-    DummyRecordWriter<IntWritable, WeightedVectorWritable> writer = new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
-    Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context = DummyRecordWriter
-        .build(mapper, conf, writer);
-    mapper.setup(context);
-
-    Collection<Canopy> canopies = Lists.newArrayList();
-    int nextCanopyId = 0;
-    for (Vector centroid : manhattanCentroids) {
-      canopies.add(new Canopy(centroid, nextCanopyId++,
-          manhattanDistanceMeasure));
-    }
-    setField(mapper, "canopies", canopies);
-    List<VectorWritable> points = getPointsWritable();
-    // map the data
-    for (VectorWritable point : points) {
-      mapper.map(new Text(), point, context);
-    }
-    Map<IntWritable, List<WeightedVectorWritable>> data = writer.getData();
-    assertEquals("Number of map results", canopies.size(), data.size());
-    for (Entry<IntWritable, List<WeightedVectorWritable>> stringListEntry : data
-        .entrySet()) {
-      IntWritable key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key.get(), canopies);
-      List<WeightedVectorWritable> pts = stringListEntry.getValue();
-      for (WeightedVectorWritable ptDef : pts) {
-        assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef
-            .getVector()));
-      }
-    }
-  }
-
-  /**
-   * Story: User can cluster a subset of the points using a ClusterMapper and a
-   * EuclideanDistanceMeasure.
-   */
-  @Test
-  public void testClusterMapperEuclidean() throws Exception {
-    ClusterMapper mapper = new ClusterMapper();
-    Configuration conf = new Configuration();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
-        "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
-    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
-    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
-    DummyRecordWriter<IntWritable, WeightedVectorWritable> writer = new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
-    Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context = DummyRecordWriter
-        .build(mapper, conf, writer);
-    mapper.setup(context);
-
-    Collection<Canopy> canopies = Lists.newArrayList();
-    int nextCanopyId = 0;
-    for (Vector centroid : euclideanCentroids) {
-      canopies.add(new Canopy(centroid, nextCanopyId++,
-          euclideanDistanceMeasure));
-    }
-
-    setField(mapper, "canopies", canopies);
-    List<VectorWritable> points = getPointsWritable();
-    // map the data
-    for (VectorWritable point : points) {
-      mapper.map(new Text(), point, context);
-    }
-    Map<IntWritable, List<WeightedVectorWritable>> data = writer.getData();
-    assertEquals("Number of map results", canopies.size(), data.size());
-    for (Entry<IntWritable, List<WeightedVectorWritable>> stringListEntry : data
-        .entrySet()) {
-      IntWritable key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key.get(), canopies);
-      List<WeightedVectorWritable> pts = stringListEntry.getValue();
-      for (WeightedVectorWritable ptDef : pts) {
-        assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef
-            .getVector()));
-      }
-    }
-  }
-
   /** Story: User can cluster points using sequential execution */
   @Test
   public void testClusteringManhattanSeq() throws Exception {