You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pr...@apache.org on 2012/03/08 15:24:18 UTC
svn commit: r1298406 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/clustering/canopy/
test/java/org/apache/mahout/clustering/canopy/
Author: pranjan
Date: Thu Mar 8 14:24:18 2012
New Revision: 1298406
URL: http://svn.apache.org/viewvc?rev=1298406&view=rev
Log:
MAHOUT-982, Clustering vectors using ClusterClassificationDriver. Deleted ClusterMapper and its test cases.
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java Thu Mar 8 14:24:18 2012
@@ -17,23 +17,20 @@
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.clustering.AbstractCluster;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.math.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
public class CanopyClusterer {
private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class);
@@ -168,32 +165,6 @@ public class CanopyClusterer {
}
/**
- * Emit the point to the closest Canopy
- */
- public void emitPointToClosestCanopy(Vector point,
- Iterable<Canopy> canopies,
- Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
- throws IOException, InterruptedException {
- Canopy closest = findClosestCanopy(point, canopies);
- context.write(new IntWritable(closest.getId()), new WeightedVectorWritable(1, point));
- context.setStatus("Emit Closest Canopy ID:" + closest.getIdentifier());
- }
-
- protected Canopy findClosestCanopy(Vector point, Iterable<Canopy> canopies) {
- double minDist = Double.MAX_VALUE;
- Canopy closest = null;
- // find closest canopy
- for (Canopy canopy : canopies) {
- double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
- if (dist < minDist) {
- minDist = dist;
- closest = canopy;
- }
- }
- return closest;
- }
-
- /**
* Return if the point is covered by the canopy
*
* @param point
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Thu Mar 8 14:24:18 2012
@@ -17,19 +17,16 @@
package org.apache.mahout.clustering.canopy;
+import static org.apache.mahout.clustering.topdown.PathDirectory.CLUSTERED_POINTS_DIRECTORY;
+
import java.io.IOException;
import java.util.Collection;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -38,7 +35,9 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.AbstractCluster;
import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
@@ -51,6 +50,9 @@ import org.apache.mahout.math.VectorWrit
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
public class CanopyDriver extends AbstractJob {
public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
@@ -354,81 +356,9 @@ public class CanopyDriver extends Abstra
public static void clusterData(Configuration conf, Path points,
Path canopies, Path output, DistanceMeasure measure, double t1,
double t2, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
- if (runSequential) {
- clusterDataSeq(points, canopies, output, measure, t1, t2);
- } else {
- clusterDataMR(conf, points, canopies, output, measure, t1, t2);
- }
- }
-
- private static void clusterDataSeq(Path points, Path canopies, Path output,
- DistanceMeasure measure, double t1, double t2) throws IOException {
- CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
-
- Collection<Canopy> clusters = Lists.newArrayList();
- Configuration conf = new Configuration();
-
- for (Canopy value : new SequenceFileDirValueIterable<Canopy>(canopies,
- PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
- clusters.add(value);
- }
-
- // iterate over all points, assigning each to the closest canopy and
- // outputing that clustering
- FileSystem fs = FileSystem.get(points.toUri(), conf);
- FileStatus[] status = fs.listStatus(points, PathFilters.logsCRCFilter());
- Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
- int part = 0;
- for (FileStatus s : status) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
- conf);
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
- outPath, "part-m-" + part), IntWritable.class,
- WeightedVectorWritable.class);
- try {
- Writable key = ClassUtils.instantiateAs(reader.getKeyClassName(), Writable.class);
- VectorWritable vw = ClassUtils.instantiateAs(reader.getValueClassName(), VectorWritable.class);
- while (reader.next(key, vw)) {
- Canopy closest = clusterer.findClosestCanopy(vw.get(), clusters);
- writer.append(new IntWritable(closest.getId()),
- new WeightedVectorWritable(1, vw.get()));
- vw = ClassUtils.instantiateAs(reader.getValueClassName(), VectorWritable.class);
- }
- } finally {
- Closeables.closeQuietly(reader);
- Closeables.closeQuietly(writer);
- }
- }
- }
-
- private static void clusterDataMR(Configuration conf, Path points,
- Path canopies, Path output, DistanceMeasure measure, double t1, double t2)
- throws IOException, InterruptedException, ClassNotFoundException {
- conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
- .getName());
- conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
- conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
- conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
-
- Job job = new Job(conf, "Canopy Driver running clusterData over input: "
- + points);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setMapperClass(ClusterMapper.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(WeightedVectorWritable.class);
- job.setNumReduceTasks(0);
- job.setJarByClass(CanopyDriver.class);
-
- FileInputFormat.addInputPath(job, points);
- Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
- FileOutputFormat.setOutputPath(job, outPath);
- HadoopUtil.delete(conf, outPath);
-
- if (!job.waitForCompletion(true)) {
- throw new InterruptedException("Canopy Clustering failed processing "
- + canopies);
- }
+ ClusterClassifier.writePolicy(new CanopyClusteringPolicy(),
+ canopies);
+ ClusterClassificationDriver.run(points, output, new Path(output, CLUSTERED_POINTS_DIRECTORY), 0.0, true, runSequential);
}
-
+
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=1298406&r1=1298405&r2=1298406&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Thu Mar 8 14:24:18 2012
@@ -17,18 +17,12 @@
package org.apache.mahout.clustering.canopy;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.Map.Entry;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -37,7 +31,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.ClusteringTestUtils;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
import org.apache.mahout.common.DummyRecordWriter;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.MahoutTestCase;
@@ -52,6 +45,9 @@ import org.apache.mahout.math.VectorWrit
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
public final class TestCanopyCreation extends MahoutTestCase {
private static final double[][] RAW = { { 1, 1 }, { 2, 1 }, { 1, 2 },
@@ -103,15 +99,6 @@ public final class TestCanopyCreation ex
}
}
- private static Canopy findCanopy(Integer key, Iterable<Canopy> canopies) {
- for (Canopy c : canopies) {
- if (c.getId() == key) {
- return c;
- }
- }
- return null;
- }
-
@Override
@Before
public void setUp() throws Exception {
@@ -395,93 +382,6 @@ public final class TestCanopyCreation ex
}
}
- /**
- * Story: User can cluster a subset of the points using a ClusterMapper and a
- * ManhattanDistanceMeasure.
- */
- @Test
- public void testClusterMapperManhattan() throws Exception {
- ClusterMapper mapper = new ClusterMapper();
- Configuration conf = new Configuration();
- conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
- "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
- conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
- conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
- DummyRecordWriter<IntWritable, WeightedVectorWritable> writer = new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
- Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context = DummyRecordWriter
- .build(mapper, conf, writer);
- mapper.setup(context);
-
- Collection<Canopy> canopies = Lists.newArrayList();
- int nextCanopyId = 0;
- for (Vector centroid : manhattanCentroids) {
- canopies.add(new Canopy(centroid, nextCanopyId++,
- manhattanDistanceMeasure));
- }
- setField(mapper, "canopies", canopies);
- List<VectorWritable> points = getPointsWritable();
- // map the data
- for (VectorWritable point : points) {
- mapper.map(new Text(), point, context);
- }
- Map<IntWritable, List<WeightedVectorWritable>> data = writer.getData();
- assertEquals("Number of map results", canopies.size(), data.size());
- for (Entry<IntWritable, List<WeightedVectorWritable>> stringListEntry : data
- .entrySet()) {
- IntWritable key = stringListEntry.getKey();
- Canopy canopy = findCanopy(key.get(), canopies);
- List<WeightedVectorWritable> pts = stringListEntry.getValue();
- for (WeightedVectorWritable ptDef : pts) {
- assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef
- .getVector()));
- }
- }
- }
-
- /**
- * Story: User can cluster a subset of the points using a ClusterMapper and a
- * EuclideanDistanceMeasure.
- */
- @Test
- public void testClusterMapperEuclidean() throws Exception {
- ClusterMapper mapper = new ClusterMapper();
- Configuration conf = new Configuration();
- conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
- "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
- conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
- conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
- DummyRecordWriter<IntWritable, WeightedVectorWritable> writer = new DummyRecordWriter<IntWritable, WeightedVectorWritable>();
- Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context = DummyRecordWriter
- .build(mapper, conf, writer);
- mapper.setup(context);
-
- Collection<Canopy> canopies = Lists.newArrayList();
- int nextCanopyId = 0;
- for (Vector centroid : euclideanCentroids) {
- canopies.add(new Canopy(centroid, nextCanopyId++,
- euclideanDistanceMeasure));
- }
-
- setField(mapper, "canopies", canopies);
- List<VectorWritable> points = getPointsWritable();
- // map the data
- for (VectorWritable point : points) {
- mapper.map(new Text(), point, context);
- }
- Map<IntWritable, List<WeightedVectorWritable>> data = writer.getData();
- assertEquals("Number of map results", canopies.size(), data.size());
- for (Entry<IntWritable, List<WeightedVectorWritable>> stringListEntry : data
- .entrySet()) {
- IntWritable key = stringListEntry.getKey();
- Canopy canopy = findCanopy(key.get(), canopies);
- List<WeightedVectorWritable> pts = stringListEntry.getValue();
- for (WeightedVectorWritable ptDef : pts) {
- assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef
- .getVector()));
- }
- }
- }
-
/** Story: User can cluster points using sequential execution */
@Test
public void testClusteringManhattanSeq() throws Exception {