You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/02/20 05:28:00 UTC
svn commit: r629348 - in /lucene/mahout/trunk: ./
src/main/java/org/apache/mahout/clustering/
src/main/java/org/apache/mahout/clustering/canopy/
src/test/java/org/apache/mahout/clustering/
src/test/java/org/apache/mahout/clustering/canopy/
Author: gsingers
Date: Tue Feb 19 20:27:57 2008
New Revision: 629348
URL: http://svn.apache.org/viewvc?rev=629348&view=rev
Log:
MAHOUT-3: Added Canopy clustering. Mahout's first M/R code\! Woo hoo\! Nice work Jeff\!
Added:
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java (with props)
Modified:
lucene/mahout/trunk/build.xml
Modified: lucene/mahout/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/build.xml?rev=629348&r1=629347&r2=629348&view=diff
==============================================================================
--- lucene/mahout/trunk/build.xml (original)
+++ lucene/mahout/trunk/build.xml Tue Feb 19 20:27:57 2008
@@ -17,7 +17,7 @@
limitations under the License.
-->
-<project name="Mahout" default="jar" basedir=".">
+<project name="mahout" default="dist-jar" basedir=".">
<property file="build.properties" />
<property name="Name" value="Mahout" />
@@ -266,7 +266,7 @@
- <target name="test" depends="compile-test" description="Runs unit tests">
+ <target name="test" depends="compile-test, dist-jar" description="Runs unit tests">
<mkdir dir="${junit.output.dir}"/>
<junit printsummary="off" haltonfailure="no"
errorProperty="tests.failed" failureProperty="tests.failed">
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,358 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+/**
+ * This class models a canopy as a center point, the number of points that are
+ * contained within it according to the application of some distance metric, and
+ * a point total which is the sum of all the points and is used to compute the
+ * centroid when needed.
+ *
+ */
+public class Canopy {
+
+ // keys used by Driver, Mapper, Combiner & Reducer
+ public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+
+ public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
+ public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+ public static final String CANOPY_PATH_KEY = "org.apache.mahout.clustering.canopy.path";
+
+ // the next canopyId to be allocated
+ private static int nextCanopyId = 0;
+
+ // the T1 distance threshold
+ private static float t1;
+
+ // the T2 distance threshold
+ private static float t2;
+
+ // the distance measure
+ private static DistanceMeasure measure;
+
+ // this canopy's canopyId
+ private int canopyId;
+
+ // the current center
+ private Float[] center = new Float[0];
+
+ // the number of points in the canopy
+ private int numPoints = 0;
+
+ // the total of all points added to the canopy
+ private Float[] pointTotal = null;
+
+ /**
+ * Create a new Canopy containing the given point
+ *
+ * @param point a Float[]
+ */
+ public Canopy(Float[] point) {
+ super();
+ this.canopyId = nextCanopyId++;
+ this.center = point;
+ this.pointTotal = point.clone();
+ this.numPoints = 1;
+ }
+
+ /**
+ * Create a new Canopy containing the given point and canopyId
+ *
+ * @param point a Float[]
+ * @param canopyId an int identifying the canopy local to this process only
+ */
+ public Canopy(Float[] point, int canopyId) {
+ super();
+ this.canopyId = canopyId;
+ this.center = point;
+ this.pointTotal = point.clone();
+ this.numPoints = 1;
+ }
+
+ /**
+ * Configure the Canopy and its distance measure
+ *
+ * @param job the JobConf for this job
+ */
+ public static void configure(JobConf job) {
+ try {
+ Class cl = Class.forName(job.get(DISTANCE_MEASURE_KEY));
+ measure = (DistanceMeasure) cl.newInstance();
+ measure.configure(job);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ nextCanopyId = 0;
+ t1 = new Float(job.get(T1_KEY));
+ t2 = new Float(job.get(T2_KEY));
+ }
+
+ /**
+ * Configure the Canopy for unit tests
+ * @param aMeasure
+ * @param aT1
+ * @param aT2
+ */
+ public static void config(DistanceMeasure aMeasure, float aT1, float aT2) {
+ nextCanopyId = 0;
+ measure = aMeasure;
+ t1 = aT1;
+ t2 = aT2;
+ }
+
+ /**
+ * This is the same algorithm as the reference but inverted to iterate over
+ * existing canopies instead of the points. Because of this it does not need
+ * to actually store the points, instead storing a total points vector and the
+ * number of points. From this a centroid can be computed.
+ *
+ * This method is used by the CanopyReducer.
+ *
+ * @param point the Float[] defining the point to be added
+ * @param canopies the List<Canopy> to be appended
+ */
+ public static void addPointToCanopies(Float[] point, List<Canopy> canopies) {
+ boolean pointStronglyBound = false;
+ for (Canopy canopy : canopies) {
+ float dist = measure.distance(canopy.getCenter(), point);
+ if (dist < t1)
+ canopy.addPoint(point);
+ pointStronglyBound = pointStronglyBound | (dist < t2);
+ }
+ if (!pointStronglyBound)
+ canopies.add(new Canopy(point));
+ }
+
+ /**
+ * This method is used by the CanopyMapper to perform canopy inclusion tests
+ * and to emit the point and its covering canopies to the output. The
+ * CanopyCombiner will then sum the canopy points and produce the centroids.
+ *
+ * @param point the Float[] defining the point to be added
+ * @param canopies the List<Canopy> to be appended
+ * @param collector an OutputCollector in which to emit the point
+ */
+ public static void emitPointToNewCanopies(Float[] point,
+ List<Canopy> canopies, OutputCollector collector) throws IOException {
+ boolean pointStronglyBound = false;
+ for (Canopy canopy : canopies) {
+ float dist = measure.distance(canopy.getCenter(), point);
+ if (dist < t1)
+ canopy.emitPoint(point, collector);
+ pointStronglyBound = pointStronglyBound | (dist < t2);
+ }
+ if (!pointStronglyBound) {
+ Canopy canopy = new Canopy(point);
+ canopies.add(canopy);
+ canopy.emitPoint(point, collector);
+ }
+ }
+
+ /**
+ * This method is used by the CanopyMapper to perform canopy inclusion tests
+ * and to emit the point keyed by its covering canopies to the output. if the
+ * point is not covered by any canopies (due to canopy centroid clustering),
+ * emit the point to the closest covering canopy.
+ *
+ * @param point the Float[] defining the point to be added
+ * @param canopies the List<Canopy> to be appended
+ * @param writable the original Writable from the input, may include arbitrary
+ * payload information after the point [...]<payload>
+ * @param collector an OutputCollector in which to emit the point
+ */
+ public static void emitPointToExistingCanopies(Float[] point,
+ List<Canopy> canopies, Writable writable, OutputCollector collector)
+ throws IOException {
+ float minDist = Float.MAX_VALUE;
+ Canopy closest = null;
+ boolean isCovered = false;
+ for (Canopy canopy : canopies) {
+ float dist = measure.distance(canopy.getCenter(), point);
+ if (dist < t1) {
+ isCovered = true;
+ collector.collect(new Text(Canopy.formatCanopy(canopy)), writable);
+ } else if (dist < minDist) {
+ minDist = dist;
+ closest = canopy;
+ }
+ }
+ // if the point is not contained in any canopies (due to canopy centroid
+ // clustering), emit the point to the closest covering canopy.
+ if (!isCovered)
+ collector.collect(new Text(Canopy.formatCanopy(closest)), writable);
+ }
+
+ /**
+ * Returns a print string for the point
+ *
+ * @param out a String to append to
+ * @param pt the Float[] point
+ * @return
+ */
+ public static String ptOut(String out, Float[] pt) {
+ out += formatPoint(pt);
+ return out;
+ }
+
+ /**
+ * Format the point for input to a Mapper or Reducer
+ *
+ * @param point a Float[]
+ * @return a String
+ */
+ public static String formatPoint(Float[] point) {
+ String out = "";
+ out += "[";
+ for (int i = 0; i < point.length; i++)
+ out += point[i] + ", ";
+ out += "] ";
+ String ptOut = out;
+ return ptOut;
+ }
+
+ /**
+ * Decodes a point from its string representation.
+ *
+ * @param formattedString a comma-terminated String of the form
+ * "[v1,v2,...,vn,]"
+ * @return the Float[] defining an n-dimensional point
+ */
+ public static Float[] decodePoint(String formattedString) {
+ String[] pts = formattedString.split(",");
+ Float[] point = new Float[pts.length - 1];
+ for (int i = 0; i < point.length; i++)
+ if (pts[i].startsWith("["))
+ point[i] = new Float(pts[i].substring(1));
+ else if (!pts[i].startsWith("]"))
+ point[i] = new Float(pts[i]);
+ return point;
+ }
+
+ /**
+ * Format the canopy for output
+ *
+ * @param canopy
+ * @return
+ */
+ public static String formatCanopy(Canopy canopy) {
+ return "C" + canopy.canopyId + ": " + formatPoint(canopy.computeCentroid());
+ }
+
+ /**
+ * Decodes and returns a Canopy from the formattedString
+ *
+ * @param formattedString a String prouced by formatCanopy
+ * @return a new Canopy
+ */
+ public static Canopy decodeCanopy(String formattedString) {
+ int beginIndex = formattedString.indexOf('[');
+ String id = formattedString.substring(0, beginIndex);
+ String centroid = formattedString.substring(beginIndex);
+ if (id.startsWith("C")) {
+ int canopyId = new Integer(formattedString.substring(1, beginIndex - 2));
+ Float[] canopyCentroid = decodePoint(centroid);
+ return new Canopy(canopyCentroid, canopyId);
+ }
+ return null;
+ }
+
+ /**
+ * Add a point to the canopy
+ *
+ * @param point a Float[]
+ */
+ public void addPoint(Float[] point) {
+ numPoints++;
+ for (int i = 0; i < point.length; i++)
+ pointTotal[i] = new Float(point[i] + pointTotal[i]);
+ }
+
+ /**
+ * Emit the point to the collector, keyed by the canopy's formatted
+ * representation
+ *
+ * @param point a Float[]
+ */
+ public void emitPoint(Float[] point, OutputCollector collector)
+ throws IOException {
+ collector.collect(new Text(formatCanopy(this)), new Text(ptOut("", point)));
+ }
+
+ /**
+ * Return a printable representation of this object, using the user supplied
+ * identifier
+ *
+ * @return
+ */
+ public String toString() {
+ return "C" + canopyId + " - " + ptOut("", getCenter());
+ }
+
+ public int getCanopyId() {
+ return canopyId;
+ }
+
+ /**
+ * Return the center point
+ *
+ * @return a Float[]
+ */
+ public Float[] getCenter() {
+ return center;
+ }
+
+ /**
+ * Return the number of points in the Canopy
+ *
+ * @return
+ */
+ public int getNumPoints() {
+ return numPoints;
+ }
+
+ /**
+ * Compute the centroid by averaging the pointTotals
+ *
+ * @return a Float[] which is the new centroid
+ */
+ public Float[] computeCentroid() {
+ Float[] result = new Float[pointTotal.length];
+ for (int i = 0; i < pointTotal.length; i++)
+ result[i] = new Float(pointTotal[i] / numPoints);
+ return result;
+ }
+
+ /**
+ * Return if the point is covered by this canopy
+ *
+ * @param point a Float[] point
+ * @return if the point is covered
+ */
+ public boolean covers(Float[] point) {
+ return measure.distance(center, point) < t1;
+ }
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,54 @@
+package org.apache.mahout.clustering.canopy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class CanopyClusteringJob {
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ String input = args[0];
+ String output = args[1];
+ String measureClassName = args[2];
+ float t1 = new Float(args[3]);
+ float t2 = new Float(args[4]);
+ String jarLocation = "apache-mahout-0.1-dev.jar";
+ if (args.length > 5){
+ jarLocation = args[5];
+ }
+ runJob(input, output, measureClassName, t1, t2, jarLocation);
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input the input pathname String
+ * @param output the output pathname String
+ * @param measureClassName the DistanceMeasure class name
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
+ */
+ public static void runJob(String input, String output,
+ String measureClassName, float t1, float t2, String jarLocation) {
+ CanopyDriver.runJob(input, output + "/canopies", measureClassName, t1, t2, jarLocation);
+ ClusterDriver.runJob(input, output + "/canopies", output, measureClassName, t1, t2, jarLocation);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,57 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class CanopyCombiner extends MapReduceBase implements Reducer {
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ Writable value = (Writable) values.next();
+ Float[] center = Canopy.decodePoint(value.toString());
+ Canopy canopy = new Canopy(center);
+ while (values.hasNext()) {
+ value = (Writable) values.next();
+ Float[] point = Canopy.decodePoint(value.toString());
+ canopy.addPoint(point);
+ }
+ output.collect(new Text("centroid"), new Text(Canopy.formatPoint(canopy
+ .computeCentroid())));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Canopy.configure(job);
+ }
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class CanopyDriver {
+
+ public static void main(String[] args) {
+ String input = args[0];
+ String output = args[1];
+ String measureClassName = args[2];
+ float t1 = new Float(args[3]);
+ float t2 = new Float(args[4]);
+ String jarLocation = "apache-mahout-0.1-dev.jar";
+ if (args.length > 5){
+ jarLocation = args[5];
+ }
+ runJob(input, output, measureClassName, t1, t2, jarLocation);
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input the input pathname String
+ * @param output the output pathname String
+ * @param measureClassName the DistanceMeasure class name
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
+ */
+ public static void runJob(String input, String output,
+ String measureClassName, float t1, float t2, String jarLocation) {
+ JobClient client = new JobClient();
+ JobConf conf = new JobConf(
+ org.apache.mahout.clustering.canopy.CanopyDriver.class);
+ conf.setJar(jarLocation);
+ conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
+ conf.set(Canopy.T1_KEY, "" + t1);
+ conf.set(Canopy.T2_KEY, "" + t2);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setInputPath(new Path(input));
+ Path outPath = new Path(output);
+ conf.setOutputPath(outPath);
+
+ conf.setMapperClass(CanopyMapper.class);
+ conf.setCombinerClass(CanopyCombiner.class);
+ conf.setReducerClass(CanopyReducer.class);
+ conf.setNumReduceTasks(1);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ client.setConf(conf);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.exists(outPath))
+ dfs.delete(outPath);
+ JobClient.runJob(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,60 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class CanopyMapper extends MapReduceBase implements Mapper {
+
+ List<Canopy> canopies = new ArrayList<Canopy>();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.Mapper#map(org.apache.hadoop.io.WritableComparable,
+ * org.apache.hadoop.io.Writable,
+ * org.apache.hadoop.mapred.OutputCollector,
+ * org.apache.hadoop.mapred.Reporter)
+ */
+ public void map(WritableComparable key, Writable values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ Float[] point = Canopy.decodePoint(values.toString());
+ Canopy.emitPointToNewCanopies(point, canopies, output);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Canopy.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class CanopyReducer extends MapReduceBase implements Reducer {
+
+ List<Canopy> canopies = new ArrayList<Canopy>();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable,
+ * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
+ * org.apache.hadoop.mapred.Reporter)
+ */
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ while (values.hasNext()) {
+ Text value = (Text) values.next();
+ Float[] point = Canopy.decodePoint(value.toString());
+ Canopy.addPointToCanopies(point, canopies);
+ }
+ for (Canopy canopy : canopies)
+ output.collect(new Text("C" + canopy.getCanopyId()), new Text(Canopy
+ .formatPoint(canopy.computeCentroid())));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Canopy.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class ClusterDriver {
+
+ public static void main(String[] args) {
+ String points = args[0];
+ String canopies = args[1];
+ String output = args[2];
+ String measureClassName = args[3];
+ float t1 = new Float(args[4]);
+ float t2 = new Float(args[5]);
+ String jarLocation = "apache-mahout-0.1-dev.jar";
+ if (args.length > 6){
+ jarLocation = args[6];
+ }
+ runJob(points, canopies, output, measureClassName, t1, t2, jarLocation);
+ }
+
+ /**
+ * Run the job
+ *
+ * @param points the input points directory pathname String
+ * @param canopies the input canopies directory pathname String
+ * @param output the output directory pathname String
+ * @param measureClassName the DistanceMeasure class name
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
+ * @param jarLocation
+ */
+ public static void runJob(String points, String canopies, String output,
+ String measureClassName, float t1, float t2, String jarLocation) {
+ JobClient client = new JobClient();
+ JobConf conf = new JobConf(
+ org.apache.mahout.clustering.canopy.ClusterDriver.class);
+ conf.setJar(jarLocation);
+ conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
+ conf.set(Canopy.T1_KEY, "" + t1);
+ conf.set(Canopy.T2_KEY, "" + t2);
+ conf.set(Canopy.CANOPY_PATH_KEY, canopies);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setInputPath(new Path(points));
+ Path outPath = new Path(output + "/clusters");
+ conf.setOutputPath(outPath);
+
+ conf.setMapperClass(ClusterMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ client.setConf(conf);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ if (dfs.exists(outPath))
+ dfs.delete(outPath);
+ JobClient.runJob(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ClusterMapper extends MapReduceBase implements Mapper {
+
+ List<Canopy> canopies;
+
+ public void map(WritableComparable key, Writable values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ Float[] point = Canopy.decodePoint(values.toString());
+ Canopy.emitPointToExistingCanopies(point, canopies, values, output);
+ }
+
+ /**
+ * Configure the mapper by providing its canopies. Used by unit tests.
+ *
+ * @param canopies a List<Canopy>
+ */
+ public void config(List<Canopy> canopies) {
+ this.canopies = canopies;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Canopy.configure(job);
+
+ String canopyPath = job.get(Canopy.CANOPY_PATH_KEY);
+ canopies = new ArrayList<Canopy>();
+
+ try {
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path(canopyPath + "/part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ try {
+ Text key = new Text();
+ Text value = new Text();
+ while (reader.next(key, value)) {
+ Canopy canopy = new Canopy(Canopy.decodePoint(value.toString()));
+ canopies.add(canopy);
+ }
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConfigurable;
+
+/**
+ * This interface is used for objects which can determine a distance metric
+ * between two points
+ */
+public interface DistanceMeasure extends JobConfigurable {
+
+ /**
+ * Returns the distance metric applied to the arguments
+ *
+ * @param p1 a Float[] defining a multidimensional point in some feature space
+ * @param p2 a Float[] defining a multidimensional point in some feature space
+ * @return a scalar float of the distance
+ */
+ public float distance(Float[] p1, Float[] p2);
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a Euclidian distance metric by summing the square root
+ * of the squared differences between each coordinate
+ */
+public class EuclideanDistanceMeasure implements DistanceMeasure {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ // nothing to do
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[], java.lang.Float[])
+ */
+ public float distance(Float[] p1, Float[] p2) {
+ float result = 0;
+ for (int i = 0; i < p1.length; i++) {
+ float delta = p2[i] - p1[i];
+ result += delta * delta;
+ }
+ return (float) Math.sqrt(result);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,47 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a "manhattan distance" metric by summing the absolute
+ * values of the difference between each coordinate
+ */
+public class ManhattanDistanceMeasure implements DistanceMeasure {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[],
+ * java.lang.Float[])
+ */
+ public float distance(Float[] p1, Float[] p2) {
+ float result = 0;
+ for (int i = 0; i < p1.length; i++)
+ result += Math.abs(p2[i] - p1[i]);
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ // nothing to do
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,56 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public class DummyOutputCollector implements OutputCollector {
+
+ Map<String, List<Writable>> data = new TreeMap<String, List<Writable>>();
+
+ public void collect(WritableComparable key, Writable values)
+ throws IOException {
+ List<Writable> points = data.get(key.toString());
+ if (points == null) {
+ points = new ArrayList<Writable>();
+ data.put(key.toString(), points);
+ }
+ points.add(values);
+ }
+
+ public Map<String, List<Writable>> getData() {
+ return data;
+ }
+
+ public List<Writable> getValue(String key) {
+ return data.get(key);
+ }
+
+ public Set<String> getKeys() {
+ return data.keySet();
+ }
+
+}
Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,799 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestCanopyCreation extends TestCase {
+ static final float[][] raw = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
+ { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
+
+ List<Canopy> referenceManhattan;
+
+ DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+
+ List<Float[]> manhattanCentroids;
+
+ List<Canopy> referenceEuclidean;
+
+ DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
+ List<Float[]> euclideanCentroids;
+
+ public TestCanopyCreation(String name) {
+ super(name);
+ }
+
+ private List<Float[]> getPoints(float[][] raw) {
+ List<Float[]> points = new ArrayList<Float[]>();
+ for (int i = 0; i < raw.length; i++) {
+ float[] fr = raw[i];
+ Float[] fs = new Float[fr.length];
+ for (int j = 0; j < fs.length; j++)
+ fs[j] = fr[j];
+ points.add(fs);
+ }
+ return points;
+ }
+
+ private List<Text> getFormattedPoints(List<Float[]> points) {
+ List<Text> result = new ArrayList<Text>();
+ for (Float[] point : points)
+ result.add(new Text(Canopy.formatPoint(point)));
+ return result;
+ }
+
+ /**
+ * Verify that the given canopies are equivalent to the referenceManhattan
+ *
+ * @param canopies
+ */
+ private void verifyManhattanCanopies(List<Canopy> canopies) {
+ verifyCanopies(canopies, referenceManhattan);
+ }
+
+ /**
+ * Verify that the given canopies are equivalent to the referenceEuclidean
+ *
+ * @param canopies
+ */
+ private void verifyEuclideanCanopies(List<Canopy> canopies) {
+ verifyCanopies(canopies, referenceEuclidean);
+ }
+
+ /**
+ * Verify that the given canopies are equivalent to the reference. This means
+ * the number of canopies is the same, the number of points in each is the
+ * same and the centroids are the same.
+ *
+ * @param canopies
+ */
+ private void verifyCanopies(List<Canopy> canopies, List<Canopy> reference) {
+ assertEquals("number of canopies", reference.size(), canopies.size());
+ for (int canopyIx = 0; canopyIx < canopies.size(); canopyIx++) {
+ Canopy refCanopy = reference.get(canopyIx);
+ Canopy testCanopy = canopies.get(canopyIx);
+ assertEquals("canopy points " + canopyIx, refCanopy.getNumPoints(),
+ testCanopy.getNumPoints());
+ Float[] refCentroid = refCanopy.computeCentroid();
+ Float[] testCentroid = testCanopy.computeCentroid();
+ for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+ assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+ refCentroid[pointIx], testCentroid[pointIx]);
+ }
+ }
+ }
+
+ /**
+ * Print the canopies to the transcript
+ *
+ * @param canopies a List<Canopy>
+ */
+ private void prtCanopies(List<Canopy> canopies) {
+ for (Canopy canopy : canopies) {
+ System.out.println(canopy.toString());
+ }
+ }
+
+ private void writePointsToFile(List<Float[]> points, String fileName)
+ throws IOException {
+ writePointsToFileWithPayload(points, fileName, "");
+ }
+
+ private void writePointsToFileWithPayload(List<Float[]> points,
+ String fileName, String payload) throws IOException {
+ BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
+ for (Float[] point : points) {
+ output.write(Canopy.formatPoint(point));
+ output.write(payload);
+ output.write("\n");
+ }
+ output.flush();
+ output.close();
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ referenceManhattan = populateCanopies(manhattanDistanceMeasure,
+ getPoints(raw), (float) 3.1, (float) 2.1);
+ manhattanCentroids = populateCentroids(referenceManhattan);
+ referenceEuclidean = populateCanopies(euclideanDistanceMeasure,
+ getPoints(raw), (float) 3.1, (float) 2.1);
+ euclideanCentroids = populateCentroids(referenceEuclidean);
+ }
+
+ /**
+ * Iterate through the canopies, adding their centroids to a list
+ *
+ * @param canopies a List<Canopy>
+ * @return the List<Float[]>
+ */
+ List<Float[]> populateCentroids(List<Canopy> canopies) {
+ List<Float[]> result = new ArrayList<Float[]>();
+ for (Canopy canopy : canopies)
+ result.add(canopy.computeCentroid());
+ return result;
+ }
+
+ /**
+ * Iterate through the points, adding new canopies. Return the canopies.
+ *
+ * @param measure a DistanceMeasure to use
+ * @param points a list<Float[]> defining the points to be clustered
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
+ * @return the List<Canopy> created
+ */
+ List<Canopy> populateCanopies(DistanceMeasure measure, List<Float[]> points,
+ float t1, float t2) {
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ Canopy.config(measure, t1, t2);
+ /**
+ * Reference Implementation: Given a distance metric, one can create
+ * canopies as follows: Start with a list of the data points in any order,
+ * and with two distance thresholds, T1 and T2, where T1 > T2. (These
+ * thresholds can be set by the user, or selected by cross-validation.) Pick
+ * a point on the list and measure its distance to all other points. Put all
+ * points that are within distance threshold T1 into a canopy. Remove from
+ * the list all points that are within distance threshold T2. Repeat until
+ * the list is empty.
+ */
+ while (!points.isEmpty()) {
+ Iterator<Float[]> ptIter = points.iterator();
+ Float[] p1 = ptIter.next();
+ ptIter.remove();
+ Canopy canopy = new VisibleCanopy(p1);
+ canopies.add(canopy);
+ while (ptIter.hasNext()) {
+ Float[] p2 = ptIter.next();
+ float dist = measure.distance(p1, p2);
+ // Put all points that are within distance threshold T1 into the canopy
+ if (dist < t1)
+ canopy.addPoint(p2);
+ // Remove from the list all points that are within distance threshold T2
+ if (dist < t2)
+ ptIter.remove();
+ }
+ }
+ return canopies;
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /**
+ * Story: User can cluster points using a ManhattanDistanceMeasure and a
+ * reference implementation
+ *
+ * @throws Exception
+ */
+ public void testReferenceManhattan() throws Exception {
+ System.out.println("testReferenceManhattan");
+ // see setUp for cluster creation
+ prtCanopies(referenceManhattan);
+ assertEquals("number of canopies", 3, referenceManhattan.size());
+ for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+ Canopy testCanopy = referenceManhattan.get(canopyIx);
+ int[] expectedNumPoints = { 4, 4, 3 };
+ float[][] expectedCentroids = { { (float) 1.5, (float) 1.5 },
+ { (float) 4.0, (float) 4.0 },
+ { (float) 4.6666665, (float) 4.6666665 } };
+ assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
+ testCanopy.getNumPoints());
+ float[] refCentroid = expectedCentroids[canopyIx];
+ Float[] testCentroid = testCanopy.computeCentroid();
+ for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+ assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+ refCentroid[pointIx], testCentroid[pointIx]);
+ }
+ }
+ }
+
+ /**
+ * Story: User can cluster points using a EuclideanDistanceMeasure and a
+ * reference implementation
+ *
+ * @throws Exception
+ */
+ public void testReferenceEuclidean() throws Exception {
+ System.out.println("testReferenceEuclidean()");
+ // see setUp for cluster creation
+ prtCanopies(referenceEuclidean);
+ assertEquals("number of canopies", 3, referenceManhattan.size());
+ for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+ Canopy testCanopy = referenceEuclidean.get(canopyIx);
+ int[] expectedNumPoints = { 5, 5, 3 };
+ float[][] expectedCentroids = { { (float) 1.8, (float) 1.8 },
+ { (float) 4.2, (float) 4.2 },
+ { (float) 4.6666665, (float) 4.6666665 } };
+ assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
+ testCanopy.getNumPoints());
+ float[] refCentroid = expectedCentroids[canopyIx];
+ Float[] testCentroid = testCanopy.computeCentroid();
+ for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+ assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+ refCentroid[pointIx], testCentroid[pointIx]);
+ }
+ }
+ }
+
+ /**
+ * Story: User can cluster points without instantiating them all in memory at
+ * once
+ *
+ * @throws Exception
+ */
+ public void testIterativeManhattan() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ Canopy.config(new ManhattanDistanceMeasure(), (float) 3.1, (float) 2.1);
+
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ for (Float[] point : points)
+ Canopy.addPointToCanopies(point, canopies);
+
+ System.out.println("testIterativeManhattan");
+ prtCanopies(canopies);
+ verifyManhattanCanopies(canopies);
+ }
+
+ /**
+ * Story: User can cluster points without instantiating them all in memory at
+ * once
+ *
+ * @throws Exception
+ */
+ public void testIterativeEuclidean() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ Canopy.config(new EuclideanDistanceMeasure(), (float) 3.1, (float) 2.1);
+
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ for (Float[] point : points)
+ Canopy.addPointToCanopies(point, canopies);
+
+ System.out.println("testIterativeEuclidean");
+ prtCanopies(canopies);
+ verifyEuclideanCanopies(canopies);
+ }
+
+ /**
+ * Story: User can produce initial canopy centers using a
+ * ManhattanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+ * points to produce an output set of canopy centroid points.
+ *
+ * @throws Exception
+ */
+ public void testCanopyMapperManhattan() throws Exception {
+ CanopyMapper mapper = new CanopyMapper();
+ CanopyCombiner combiner = new CanopyCombiner();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ assertEquals("Number of map results", 3, collector.getData().size());
+ // now combine the mapper output
+ Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ Map<String, List<Writable>> mapData = collector.getData();
+ collector = new DummyOutputCollector();
+ for (String key : mapData.keySet())
+ combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+ null);
+ // now verify the output
+ List<Writable> data = collector.getValue("centroid");
+ assertEquals("Number of centroids", 3, data.size());
+ for (int i = 0; i < data.size(); i++)
+ assertEquals("Centroid error", Canopy.formatPoint(manhattanCentroids
+ .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(i)
+ .toString())));
+ }
+
+ /**
+ * Story: User can produce initial canopy centers using a
+ * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+ * points to produce an output set of canopy centroid points.
+ *
+ * @throws Exception
+ */
+ public void testCanopyMapperEuclidean() throws Exception {
+ CanopyMapper mapper = new CanopyMapper();
+ CanopyCombiner combiner = new CanopyCombiner();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ assertEquals("Number of map results", 3, collector.getData().size());
+ // now combine the mapper output
+ Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ Map<String, List<Writable>> mapData = collector.getData();
+ collector = new DummyOutputCollector();
+ for (String key : mapData.keySet())
+ combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+ null);
+ // now verify the output
+ List<Writable> data = collector.getValue("centroid");
+ assertEquals("Number of centroids", 3, data.size());
+ for (int i = 0; i < data.size(); i++)
+ assertEquals("Centroid error", Canopy.formatPoint(euclideanCentroids
+ .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(i)
+ .toString())));
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a
+ * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid
+ * points to produce an output set of final canopy centroid points.
+ *
+ * @throws Exception
+ */
+ public void testCanopyReducerManhattan() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ List<Float[]> points = getPoints(raw);
+ List<Text> texts = getFormattedPoints(points);
+ reducer.reduce(new Text("centroid"), texts.iterator(), collector, null);
+ reducer.close();
+ Set<String> keys = collector.getKeys();
+ assertEquals("Number of centroids", 3, keys.size());
+ int i = 0;
+ for (String key : keys) {
+ List<Writable> data = collector.getValue(key);
+ assertEquals("Centroid error", Canopy.formatPoint(manhattanCentroids
+ .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(0)
+ .toString())));
+ i++;
+ }
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a
+ * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
+ * points to produce an output set of final canopy centroid points.
+ *
+ * @throws Exception
+ */
+ public void testCanopyReducerEuclidean() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ List<Float[]> points = getPoints(raw);
+ List<Text> texts = getFormattedPoints(points);
+ reducer.reduce(new Text("centroid"), texts.iterator(), collector, null);
+ reducer.close();
+ Set<String> keys = collector.getKeys();
+ assertEquals("Number of centroids", 3, keys.size());
+ int i = 0;
+ for (String key : keys) {
+ List<Writable> data = collector.getValue(key);
+ assertEquals("Centroid error", Canopy.formatPoint(euclideanCentroids
+ .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(0)
+ .toString())));
+ i++;
+ }
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a Hadoop map/reduce job
+ * and a ManhattanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testCanopyGenManhattanMR() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFile(points, "testdata/file1");
+ writePointsToFile(points, "testdata/file2");
+ // now run the Canopy Driver
+ CanopyDriver.runJob("testdata", "output/canopies",
+ ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+ // verify output from sequence file
+ JobConf job = new JobConf(
+ org.apache.mahout.clustering.canopy.CanopyDriver.class);
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path("output/canopies/part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ Text key = new Text();
+ Text value = new Text();
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("1st key", "C0", key.toString());
+ assertEquals("1st value", "[1.5, 1.5, ] ", value.toString());
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("2nd key", "C1", key.toString());
+ assertEquals("2nd value", "[4.333333, 4.333333, ] ", value.toString());
+ assertFalse("more to come", reader.next(key, value));
+ reader.close();
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a Hadoop map/reduce job
+ * and a EuclideanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testCanopyGenEuclideanMR() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFile(points, "testdata/file1");
+ writePointsToFile(points, "testdata/file2");
+ // now run the Canopy Driver
+ CanopyDriver.runJob("testdata", "output/canopies",
+ EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+ // verify output from sequence file
+ JobConf job = new JobConf(
+ org.apache.mahout.clustering.canopy.CanopyDriver.class);
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path("output/canopies/part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ Text key = new Text();
+ Text value = new Text();
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("1st key", "C0", key.toString());
+ assertEquals("1st value", "[1.8, 1.8, ] ", value.toString());
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("2nd key", "C1", key.toString());
+ assertEquals("2nd value", "[4.4333334, 4.4333334, ] ", value.toString());
+ assertFalse("more to come", reader.next(key, value));
+ reader.close();
+ }
+
+ /**
+ * Story: User can cluster a subset of the points using a ClusterMapper and a
+ * ManhattanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusterMapperManhattan() throws Exception {
+ Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ ClusterMapper mapper = new ClusterMapper();
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ for (Float[] centroid : manhattanCentroids)
+ canopies.add(new Canopy(centroid));
+ mapper.config(canopies);
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ Map<String, List<Writable>> data = collector.getData();
+ assertEquals("Number of map results", canopies.size(), data.size());
+ for (String canopyDef : data.keySet()) {
+ Canopy canopy = Canopy.decodeCanopy(canopyDef);
+ List<Writable> pts = data.get(canopyDef);
+ for (Writable ptDef : pts)
+ assertTrue("Point not in canopy", canopy.covers(Canopy
+ .decodePoint(ptDef.toString())));
+ }
+ }
+
+ /**
+ * Story: User can cluster a subset of the points using a ClusterMapper and a
+ * EuclideanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusterMapperEuclidean() throws Exception {
+ Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ ClusterMapper mapper = new ClusterMapper();
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ for (Float[] centroid : euclideanCentroids)
+ canopies.add(new Canopy(centroid));
+ mapper.config(canopies);
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ Map<String, List<Writable>> data = collector.getData();
+ assertEquals("Number of map results", canopies.size(), data.size());
+ for (String canopyDef : data.keySet()) {
+ Canopy canopy = Canopy.decodeCanopy(canopyDef);
+ List<Writable> pts = data.get(canopyDef);
+ for (Writable ptDef : pts)
+ assertTrue("Point not in canopy", canopy.covers(Canopy
+ .decodePoint(ptDef.toString())));
+ }
+ }
+
+ /**
+ * Story: User can cluster a subset of the points using a ClusterReducer and a
+ * ManhattanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusterReducerManhattan() throws Exception {
+ Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ ClusterMapper mapper = new ClusterMapper();
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ for (Float[] centroid : manhattanCentroids)
+ canopies.add(new Canopy(centroid));
+ mapper.config(canopies);
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ Map<String, List<Writable>> data = collector.getData();
+ assertEquals("Number of map results", canopies.size(), data.size());
+
+ // reduce the data
+ Reducer reducer = new IdentityReducer();
+ collector = new DummyOutputCollector();
+ for (String key : data.keySet())
+ reducer.reduce(new Text(key), data.get(key).iterator(), collector, null);
+
+ // check the output
+ data = collector.getData();
+ for (String canopyDef : data.keySet()) {
+ Canopy canopy = Canopy.decodeCanopy(canopyDef);
+ List<Writable> pts = data.get(canopyDef);
+ for (Writable ptDef : pts)
+ assertTrue("Point not in canopy", canopy.covers(Canopy
+ .decodePoint(ptDef.toString())));
+ }
+ }
+
+ /**
+ * Story: User can cluster a subset of the points using a ClusterReducer and a
+ * EuclideanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusterReducerEuclidean() throws Exception {
+ Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+ ClusterMapper mapper = new ClusterMapper();
+ List<Canopy> canopies = new ArrayList<Canopy>();
+ DummyOutputCollector collector = new DummyOutputCollector();
+ for (Float[] centroid : euclideanCentroids)
+ canopies.add(new Canopy(centroid));
+ mapper.config(canopies);
+ List<Float[]> points = getPoints(raw);
+ // map the data
+ for (Float[] point : points)
+ mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+ null);
+ Map<String, List<Writable>> data = collector.getData();
+
+ // reduce the data
+ Reducer reducer = new IdentityReducer();
+ collector = new DummyOutputCollector();
+ for (String key : data.keySet())
+ reducer.reduce(new Text(key), data.get(key).iterator(), collector, null);
+
+ // check the output
+ data = collector.getData();
+ assertEquals("Number of map results", canopies.size(), data.size());
+ for (String canopyDef : data.keySet()) {
+ Canopy canopy = Canopy.decodeCanopy(canopyDef);
+ List<Writable> pts = data.get(canopyDef);
+ for (Writable ptDef : pts)
+ assertTrue("Point not in canopy", canopy.covers(Canopy
+ .decodePoint(ptDef.toString())));
+ }
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a ManhattanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusteringManhattanMR() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFile(points, "testdata/file1");
+ writePointsToFile(points, "testdata/file2");
+ // now run the Job
+ CanopyClusteringJob.runJob("testdata", "output",
+ ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+ BufferedReader reader = new BufferedReader(new FileReader(
+ "output/clusters/part-00000"));
+ int count = 0;
+ while (reader.ready()) {
+ System.out.println(reader.readLine());
+ count++;
+ }
+ // the point [3.0,3.0] is covered by both canopies
+ assertEquals("number of points", 2 + 2 * points.size(), count);
+ reader.close();
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure.
+ *
+ * @throws Exception
+ */
+ public void testClusteringEuclideanMR() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFile(points, "testdata/file1");
+ writePointsToFile(points, "testdata/file2");
+ // now run the Job
+ CanopyClusteringJob.runJob("testdata", "output",
+ EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+ BufferedReader reader = new BufferedReader(new FileReader(
+ "output/clusters/part-00000"));
+ int count = 0;
+ while (reader.ready()) {
+ System.out.println(reader.readLine());
+ count++;
+ }
+ // the point [3.0,3.0] is covered by both canopies
+ assertEquals("number of points", 2 + 2 * points.size(), count);
+ reader.close();
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a ManhattanDistanceMeasure. Input points can have extra payload
+ * information following the point [...] and this information will be retained
+ * in the output.
+ *
+ * @throws Exception
+ */
+ public void testClusteringManhattanMRWithPayload() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFileWithPayload(points, "testdata/file1", "file1");
+ writePointsToFileWithPayload(points, "testdata/file2", "file2");
+ // now run the Job
+ CanopyClusteringJob.runJob("testdata", "output",
+ ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+ BufferedReader reader = new BufferedReader(new FileReader(
+ "output/clusters/part-00000"));
+ int count = 0;
+ while (reader.ready()) {
+ String line = reader.readLine();
+ assertTrue("No payload", line.indexOf("file") > 0);
+ System.out.println(line);
+ count++;
+ }
+ // the point [3.0,3.0] is covered by both canopies
+ assertEquals("number of points", 2 + 2 * points.size(), count);
+ reader.close();
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure. Input points can have extra payload
+ * information following the point [...] and this information will be retained
+ * in the output.
+ *
+ * @throws Exception
+ */
+ public void testClusteringEuclideanMRWithPayload() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFileWithPayload(points, "testdata/file1", "file1");
+ writePointsToFileWithPayload(points, "testdata/file2", "file2");
+ // now run the Job
+ CanopyClusteringJob.runJob("testdata", "output",
+ EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+ BufferedReader reader = new BufferedReader(new FileReader(
+ "output/clusters/part-00000"));
+ int count = 0;
+ while (reader.ready()) {
+ String line = reader.readLine();
+ assertTrue("No payload", line.indexOf("file") > 0);
+ System.out.println(line);
+ count++;
+ }
+ // the point [3.0,3.0] is covered by both canopies
+ assertEquals("number of points", 2 + 2 * points.size(), count);
+ reader.close();
+ }
+
+ /**
+ * Story: Clustering algorithm must support arbitrary user defined distance
+ * measure
+ *
+ * @throws Exception
+ */
+ public void testUserDefinedDistanceMeasure() throws Exception {
+ List<Float[]> points = getPoints(raw);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ writePointsToFile(points, "testdata/file1");
+ writePointsToFile(points, "testdata/file2");
+ // now run the Canopy Driver. User defined measure happens to be a Manhattan
+ // subclass so results are same.
+ CanopyDriver.runJob("testdata", "output/canopies",
+ UserDefinedDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+ // verify output from sequence file
+ JobConf job = new JobConf(
+ org.apache.mahout.clustering.canopy.CanopyDriver.class);
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path("output/canopies/part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ Text key = new Text();
+ Text value = new Text();
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("1st key", "C0", key.toString());
+ assertEquals("1st value", "[1.5, 1.5, ] ", value.toString());
+ assertTrue("more to come", reader.next(key, value));
+ assertEquals("2nd key", "C1", key.toString());
+ assertEquals("2nd value", "[4.333333, 4.333333, ] ", value.toString());
+ assertFalse("more to come", reader.next(key, value));
+ reader.close();
+ }
+}
Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,22 @@
+package org.apache.mahout.clustering.canopy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class UserDefinedDistanceMeasure extends ManhattanDistanceMeasure {
+
+}
Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,59 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This Canopy subclass maintains a list of points in the canopy so it can
+ * include them in its toString method. Useful for debugging but not practical
+ * for production use since it holds onto all its points.
+ *
+ */
+public class VisibleCanopy extends Canopy {
+ private List<Float[]> points = new ArrayList<Float[]>();
+
+ public VisibleCanopy(Float[] point) {
+ super(point);
+ points.add(point);
+ }
+
+ /**
+ * Add a point to the canopy
+ *
+ * @param point a Float[]
+ */
+ public void addPoint(Float[] point) {
+ super.addPoint(point);
+ points.add(point);
+ }
+
+ /**
+ * Return a printable representation of this object, using the user supplied
+ * identifier
+ *
+ * @return
+ */
+ public String toString() {
+ String out = super.toString() + ": ";
+ for (Float[] pt : points)
+ out = ptOut(out, pt);
+ return out;
+ }
+
+}
Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
------------------------------------------------------------------------------
svn:eol-style = native