You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/02/20 05:28:00 UTC

svn commit: r629348 - in /lucene/mahout/trunk: ./ src/main/java/org/apache/mahout/clustering/ src/main/java/org/apache/mahout/clustering/canopy/ src/test/java/org/apache/mahout/clustering/ src/test/java/org/apache/mahout/clustering/canopy/

Author: gsingers
Date: Tue Feb 19 20:27:57 2008
New Revision: 629348

URL: http://svn.apache.org/viewvc?rev=629348&view=rev
Log:
MAHOUT-3: Added Canopy clustering.  Mahout's first M/R code\!  Woo hoo\!  Nice work Jeff\!

Added:
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java   (with props)
Modified:
    lucene/mahout/trunk/build.xml

Modified: lucene/mahout/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/build.xml?rev=629348&r1=629347&r2=629348&view=diff
==============================================================================
--- lucene/mahout/trunk/build.xml (original)
+++ lucene/mahout/trunk/build.xml Tue Feb 19 20:27:57 2008
@@ -17,7 +17,7 @@
     limitations under the License.
  -->
 
-<project name="Mahout" default="jar" basedir=".">
+<project name="mahout" default="dist-jar" basedir=".">
   <property file="build.properties" />
   <property name="Name" value="Mahout" />
 
@@ -266,7 +266,7 @@
 
 
 
-  <target name="test" depends="compile-test" description="Runs unit tests">
+  <target name="test" depends="compile-test, dist-jar" description="Runs unit tests">
     <mkdir dir="${junit.output.dir}"/>
     <junit printsummary="off" haltonfailure="no"
       errorProperty="tests.failed" failureProperty="tests.failed">

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,358 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+/**
+ * This class models a canopy as a center point, the number of points that are
+ * contained within it according to the application of some distance metric, and
+ * a point total which is the sum of all the points and is used to compute the
+ * centroid when needed.
+ * 
+ */
+public class Canopy {
+
+  // keys used by Driver, Mapper, Combiner & Reducer
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+
+  public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
+  public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+  public static final String CANOPY_PATH_KEY = "org.apache.mahout.clustering.canopy.path";
+
+  // the next canopyId to be allocated
+  private static int nextCanopyId = 0;
+
+  // the T1 distance threshold
+  private static float t1;
+
+  // the T2 distance threshold
+  private static float t2;
+
+  // the distance measure
+  private static DistanceMeasure measure;
+
+  // this canopy's canopyId
+  private int canopyId;
+
+  // the current center
+  private Float[] center = new Float[0];
+
+  // the number of points in the canopy
+  private int numPoints = 0;
+
+  // the total of all points added to the canopy
+  private Float[] pointTotal = null;
+
+  /**
+   * Create a new Canopy containing the given point
+   * 
+   * @param point a Float[]
+   */
+  public Canopy(Float[] point) {
+    super();
+    this.canopyId = nextCanopyId++;
+    this.center = point;
+    this.pointTotal = point.clone();
+    this.numPoints = 1;
+  }
+
+  /**
+   * Create a new Canopy containing the given point and canopyId
+   * 
+   * @param point a Float[]
+   * @param canopyId an int identifying the canopy local to this process only
+   */
+  public Canopy(Float[] point, int canopyId) {
+    super();
+    this.canopyId = canopyId;
+    this.center = point;
+    this.pointTotal = point.clone();
+    this.numPoints = 1;
+  }
+
+  /**
+   * Configure the Canopy and its distance measure
+   * 
+   * @param job the JobConf for this job
+   */
+  public static void configure(JobConf job) {
+    try {
+      Class cl = Class.forName(job.get(DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      measure.configure(job);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    nextCanopyId = 0;
+    t1 = new Float(job.get(T1_KEY));
+    t2 = new Float(job.get(T2_KEY));
+  }
+
+  /**
+   * Configure the Canopy for unit tests
+   * @param aMeasure
+   * @param aT1
+   * @param aT2
+   */
+  public static void config(DistanceMeasure aMeasure, float aT1, float aT2) {
+    nextCanopyId = 0;
+    measure = aMeasure;
+    t1 = aT1;
+    t2 = aT2;
+  }
+
+  /**
+   * This is the same algorithm as the reference but inverted to iterate over
+   * existing canopies instead of the points. Because of this it does not need
+   * to actually store the points, instead storing a total points vector and the
+   * number of points. From this a centroid can be computed.
+   * 
+   * This method is used by the CanopyReducer.
+   * 
+   * @param point the Float[] defining the point to be added
+   * @param canopies the List<Canopy> to be appended
+   */
+  public static void addPointToCanopies(Float[] point, List<Canopy> canopies) {
+    boolean pointStronglyBound = false;
+    for (Canopy canopy : canopies) {
+      float dist = measure.distance(canopy.getCenter(), point);
+      if (dist < t1)
+        canopy.addPoint(point);
+      pointStronglyBound = pointStronglyBound | (dist < t2);
+    }
+    if (!pointStronglyBound)
+      canopies.add(new Canopy(point));
+  }
+
+  /**
+   * This method is used by the CanopyMapper to perform canopy inclusion tests
+   * and to emit the point and its covering canopies to the output. The
+   * CanopyCombiner will then sum the canopy points and produce the centroids.
+   * 
+   * @param point the Float[] defining the point to be added
+   * @param canopies the List<Canopy> to be appended
+   * @param collector an OutputCollector in which to emit the point
+   */
+  public static void emitPointToNewCanopies(Float[] point,
+      List<Canopy> canopies, OutputCollector collector) throws IOException {
+    boolean pointStronglyBound = false;
+    for (Canopy canopy : canopies) {
+      float dist = measure.distance(canopy.getCenter(), point);
+      if (dist < t1)
+        canopy.emitPoint(point, collector);
+      pointStronglyBound = pointStronglyBound | (dist < t2);
+    }
+    if (!pointStronglyBound) {
+      Canopy canopy = new Canopy(point);
+      canopies.add(canopy);
+      canopy.emitPoint(point, collector);
+    }
+  }
+
+  /**
+   * This method is used by the CanopyMapper to perform canopy inclusion tests
+   * and to emit the point keyed by its covering canopies to the output. if the
+   * point is not covered by any canopies (due to canopy centroid clustering),
+   * emit the point to the closest covering canopy.
+   * 
+   * @param point the Float[] defining the point to be added
+   * @param canopies the List<Canopy> to be appended
+   * @param writable the original Writable from the input, may include arbitrary
+   *        payload information after the point [...]<payload>
+   * @param collector an OutputCollector in which to emit the point
+   */
+  public static void emitPointToExistingCanopies(Float[] point,
+      List<Canopy> canopies, Writable writable, OutputCollector collector)
+      throws IOException {
+    float minDist = Float.MAX_VALUE;
+    Canopy closest = null;
+    boolean isCovered = false;
+    for (Canopy canopy : canopies) {
+      float dist = measure.distance(canopy.getCenter(), point);
+      if (dist < t1) {
+        isCovered = true;
+        collector.collect(new Text(Canopy.formatCanopy(canopy)), writable);
+      } else if (dist < minDist) {
+        minDist = dist;
+        closest = canopy;
+      }
+    }
+    // if the point is not contained in any canopies (due to canopy centroid
+    // clustering), emit the point to the closest covering canopy.
+    if (!isCovered)
+      collector.collect(new Text(Canopy.formatCanopy(closest)), writable);
+  }
+
+  /**
+   * Returns a print string for the point
+   * 
+   * @param out a String to append to
+   * @param pt the Float[] point
+   * @return
+   */
+  public static String ptOut(String out, Float[] pt) {
+    out += formatPoint(pt);
+    return out;
+  }
+
+  /**
+   * Format the point for input to a Mapper or Reducer
+   * 
+   * @param point a Float[]
+   * @return a String
+   */
+  public static String formatPoint(Float[] point) {
+    String out = "";
+    out += "[";
+    for (int i = 0; i < point.length; i++)
+      out += point[i] + ", ";
+    out += "] ";
+    String ptOut = out;
+    return ptOut;
+  }
+
+  /**
+   * Decodes a point from its string representation.
+   * 
+   * @param formattedString a comma-terminated String of the form
+   *        "[v1,v2,...,vn,]"
+   * @return the Float[] defining an n-dimensional point
+   */
+  public static Float[] decodePoint(String formattedString) {
+    String[] pts = formattedString.split(",");
+    Float[] point = new Float[pts.length - 1];
+    for (int i = 0; i < point.length; i++)
+      if (pts[i].startsWith("["))
+        point[i] = new Float(pts[i].substring(1));
+      else if (!pts[i].startsWith("]"))
+        point[i] = new Float(pts[i]);
+    return point;
+  }
+
+  /**
+   * Format the canopy for output
+   * 
+   * @param canopy
+   * @return
+   */
+  public static String formatCanopy(Canopy canopy) {
+    return "C" + canopy.canopyId + ": " + formatPoint(canopy.computeCentroid());
+  }
+
+  /**
+   * Decodes and returns a Canopy from the formattedString
+   * 
+   * @param formattedString a String prouced by formatCanopy
+   * @return a new Canopy
+   */
+  public static Canopy decodeCanopy(String formattedString) {
+    int beginIndex = formattedString.indexOf('[');
+    String id = formattedString.substring(0, beginIndex);
+    String centroid = formattedString.substring(beginIndex);
+    if (id.startsWith("C")) {
+      int canopyId = new Integer(formattedString.substring(1, beginIndex - 2));
+      Float[] canopyCentroid = decodePoint(centroid);
+      return new Canopy(canopyCentroid, canopyId);
+    }
+    return null;
+  }
+
+  /**
+   * Add a point to the canopy
+   * 
+   * @param point a Float[]
+   */
+  public void addPoint(Float[] point) {
+    numPoints++;
+    for (int i = 0; i < point.length; i++)
+      pointTotal[i] = new Float(point[i] + pointTotal[i]);
+  }
+
+  /**
+   * Emit the point to the collector, keyed by the canopy's formatted
+   * representation
+   * 
+   * @param point a Float[]
+   */
+  public void emitPoint(Float[] point, OutputCollector collector)
+      throws IOException {
+    collector.collect(new Text(formatCanopy(this)), new Text(ptOut("", point)));
+  }
+
+  /**
+   * Return a printable representation of this object, using the user supplied
+   * identifier
+   * 
+   * @return
+   */
+  public String toString() {
+    return "C" + canopyId + " - " + ptOut("", getCenter());
+  }
+
+  public int getCanopyId() {
+    return canopyId;
+  }
+
+  /**
+   * Return the center point
+   * 
+   * @return a Float[]
+   */
+  public Float[] getCenter() {
+    return center;
+  }
+
+  /**
+   * Return the number of points in the Canopy
+   * 
+   * @return
+   */
+  public int getNumPoints() {
+    return numPoints;
+  }
+
+  /**
+   * Compute the centroid by averaging the pointTotals
+   * 
+   * @return a Float[] which is the new centroid
+   */
+  public Float[] computeCentroid() {
+    Float[] result = new Float[pointTotal.length];
+    for (int i = 0; i < pointTotal.length; i++)
+      result[i] = new Float(pointTotal[i] / numPoints);
+    return result;
+  }
+
+  /**
+   * Return if the point is covered by this canopy
+   * 
+   * @param point a Float[] point
+   * @return if the point is covered
+   */
+  public boolean covers(Float[] point) {
+    return measure.distance(center, point) < t1;
+  }
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,54 @@
+package org.apache.mahout.clustering.canopy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class CanopyClusteringJob {
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    String input = args[0];
+    String output = args[1];
+    String measureClassName = args[2];
+    float t1 = new Float(args[3]);
+    float t2 = new Float(args[4]);
+    String jarLocation = "apache-mahout-0.1-dev.jar";
+    if (args.length > 5){
+      jarLocation = args[5];
+    }
+    runJob(input, output, measureClassName, t1, t2, jarLocation);
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param input the input pathname String
+   * @param output the output pathname String
+   * @param measureClassName the DistanceMeasure class name
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   */
+  public static void runJob(String input, String output,
+      String measureClassName, float t1, float t2, String jarLocation) {
+    CanopyDriver.runJob(input, output + "/canopies", measureClassName, t1, t2, jarLocation);
+    ClusterDriver.runJob(input, output + "/canopies", output, measureClassName, t1, t2, jarLocation);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,57 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class CanopyCombiner extends MapReduceBase implements Reducer {
+
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter) throws IOException {
+    Writable value = (Writable) values.next();
+    Float[] center = Canopy.decodePoint(value.toString());
+    Canopy canopy = new Canopy(center);
+    while (values.hasNext()) {
+      value = (Writable) values.next();
+      Float[] point = Canopy.decodePoint(value.toString());
+      canopy.addPoint(point);
+    }
+    output.collect(new Text("centroid"), new Text(Canopy.formatPoint(canopy
+            .computeCentroid())));
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Canopy.configure(job);
+  }
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class CanopyDriver {
+
+  public static void main(String[] args) {
+    String input = args[0];
+    String output = args[1];
+    String measureClassName = args[2];
+    float t1 = new Float(args[3]);
+    float t2 = new Float(args[4]);
+    String jarLocation = "apache-mahout-0.1-dev.jar";
+    if (args.length > 5){
+      jarLocation = args[5];
+    }
+    runJob(input, output, measureClassName, t1, t2, jarLocation);
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param input the input pathname String
+   * @param output the output pathname String
+   * @param measureClassName the DistanceMeasure class name
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   */
+  public static void runJob(String input, String output,
+      String measureClassName, float t1, float t2, String jarLocation) {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(
+        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+    conf.setJar(jarLocation);
+    conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
+    conf.set(Canopy.T1_KEY, "" + t1);
+    conf.set(Canopy.T2_KEY, "" + t2);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setInputPath(new Path(input));
+    Path outPath = new Path(output);
+    conf.setOutputPath(outPath);
+
+    conf.setMapperClass(CanopyMapper.class);
+    conf.setCombinerClass(CanopyCombiner.class);
+    conf.setReducerClass(CanopyReducer.class);
+    conf.setNumReduceTasks(1);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+    client.setConf(conf);
+    try {
+      FileSystem dfs = FileSystem.get(conf);
+      if (dfs.exists(outPath))
+        dfs.delete(outPath);
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,60 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class CanopyMapper extends MapReduceBase implements Mapper {
+
+  List<Canopy> canopies = new ArrayList<Canopy>();
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Mapper#map(org.apache.hadoop.io.WritableComparable,
+   *      org.apache.hadoop.io.Writable,
+   *      org.apache.hadoop.mapred.OutputCollector,
+   *      org.apache.hadoop.mapred.Reporter)
+   */
+  public void map(WritableComparable key, Writable values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    Float[] point = Canopy.decodePoint(values.toString());
+    Canopy.emitPointToNewCanopies(point, canopies, output);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Canopy.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class CanopyReducer extends MapReduceBase implements Reducer {
+
+  List<Canopy> canopies = new ArrayList<Canopy>();
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable,
+   *      java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
+   *      org.apache.hadoop.mapred.Reporter)
+   */
+  public void reduce(WritableComparable key, Iterator values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    while (values.hasNext()) {
+      Text value = (Text) values.next();
+      Float[] point = Canopy.decodePoint(value.toString());
+      Canopy.addPointToCanopies(point, canopies);
+    }
+    for (Canopy canopy : canopies)
+      output.collect(new Text("C" + canopy.getCanopyId()), new Text(Canopy
+          .formatPoint(canopy.computeCentroid())));
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   */
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Canopy.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class ClusterDriver {
+
+  public static void main(String[] args) {
+    String points = args[0];
+    String canopies = args[1];
+    String output = args[2];
+    String measureClassName = args[3];
+    float t1 = new Float(args[4]);
+    float t2 = new Float(args[5]);
+    String jarLocation = "apache-mahout-0.1-dev.jar";
+    if (args.length > 6){
+      jarLocation = args[6];
+    }
+    runJob(points, canopies, output, measureClassName, t1, t2, jarLocation);
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param points the input points directory pathname String
+   * @param canopies the input canopies directory pathname String
+   * @param output the output directory pathname String
+   * @param measureClassName the DistanceMeasure class name
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   * @param jarLocation
+   */
+  public static void runJob(String points, String canopies, String output,
+                            String measureClassName, float t1, float t2, String jarLocation) {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(
+        org.apache.mahout.clustering.canopy.ClusterDriver.class);
+    conf.setJar(jarLocation);
+    conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
+    conf.set(Canopy.T1_KEY, "" + t1);
+    conf.set(Canopy.T2_KEY, "" + t2);
+    conf.set(Canopy.CANOPY_PATH_KEY, canopies);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setInputPath(new Path(points));
+    Path outPath = new Path(output + "/clusters");
+    conf.setOutputPath(outPath);
+
+    conf.setMapperClass(ClusterMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    client.setConf(conf);
+    try {
+      FileSystem dfs = FileSystem.get(conf);
+      if (dfs.exists(outPath))
+        dfs.delete(outPath);
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ClusterMapper extends MapReduceBase implements Mapper {
+
+  List<Canopy> canopies;
+
+  public void map(WritableComparable key, Writable values,
+      OutputCollector output, Reporter reporter) throws IOException {
+    Float[] point = Canopy.decodePoint(values.toString());
+    Canopy.emitPointToExistingCanopies(point, canopies, values, output);
+  }
+
+  /**
+   * Configure the mapper by providing its canopies. Used by unit tests.
+   * 
+   * @param canopies a List<Canopy>
+   */
+  public void config(List<Canopy> canopies) {
+    this.canopies = canopies;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Canopy.configure(job);
+
+    String canopyPath = job.get(Canopy.CANOPY_PATH_KEY);
+    canopies = new ArrayList<Canopy>();
+
+    try {
+      FileSystem fs = FileSystem.get(job);
+      Path path = new Path(canopyPath + "/part-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+      try {
+        Text key = new Text();
+        Text value = new Text();
+        while (reader.next(key, value)) {
+          Canopy canopy = new Canopy(Canopy.decodePoint(value.toString()));
+          canopies.add(canopy);
+        }
+      } finally {
+        reader.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConfigurable;
+
+/**
+ * This interface is used for objects which can determine a distance metric
+ * between two points
+ */
+public interface DistanceMeasure extends JobConfigurable {
+
+  /**
+   * Returns the distance metric applied to the arguments
+   * 
+   * @param p1 a Float[] defining a multidimensional point in some feature space
+   * @param p2 a Float[] defining a multidimensional point in some feature space
+   * @return a scalar float of the distance
+   */
+  public float distance(Float[] p1, Float[] p2);
+  
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/DistanceMeasure.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a Euclidian distance metric by summing the square root
+ * of the squared differences between each coordinate
+ */
+public class EuclideanDistanceMeasure implements DistanceMeasure {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    // nothing to do
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[], java.lang.Float[])
+   */
+  public float distance(Float[] p1, Float[] p2) {
+    float result = 0;
+    for (int i = 0; i < p1.length; i++) {
+      float delta = p2[i] - p1[i];
+      result += delta * delta;
+    }
+    return (float) Math.sqrt(result);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/EuclideanDistanceMeasure.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,47 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * This class implements a "manhattan distance" metric by summing the absolute
+ * values of the difference between each coordinate
+ */
+public class ManhattanDistanceMeasure implements DistanceMeasure {
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[],
+   *      java.lang.Float[])
+   */
+  public float distance(Float[] p1, Float[] p2) {
+    float result = 0;
+    for (int i = 0; i < p1.length; i++)
+      result += Math.abs(p2[i] - p1[i]);
+    return result;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    // nothing to do
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ManhattanDistanceMeasure.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,56 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public class DummyOutputCollector implements OutputCollector {
+
+  Map<String, List<Writable>> data = new TreeMap<String, List<Writable>>();
+
+  public void collect(WritableComparable key, Writable values)
+      throws IOException {
+    List<Writable> points = data.get(key.toString());
+    if (points == null) {
+      points = new ArrayList<Writable>();
+      data.put(key.toString(), points);
+    }
+    points.add(values);
+  }
+
+  public Map<String, List<Writable>> getData() {
+    return data;
+  }
+
+  public List<Writable> getValue(String key) {
+    return data.get(key);
+  }
+  
+  public Set<String> getKeys() {
+    return data.keySet();
+  }
+
+}

Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,799 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.canopy;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestCanopyCreation extends TestCase {
+  static final float[][] raw = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
+      { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
+
+  List<Canopy> referenceManhattan;
+
+  DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+
+  List<Float[]> manhattanCentroids;
+
+  List<Canopy> referenceEuclidean;
+
+  DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
+  List<Float[]> euclideanCentroids;
+
+  public TestCanopyCreation(String name) {
+    super(name);
+  }
+
+  private List<Float[]> getPoints(float[][] raw) {
+    List<Float[]> points = new ArrayList<Float[]>();
+    for (int i = 0; i < raw.length; i++) {
+      float[] fr = raw[i];
+      Float[] fs = new Float[fr.length];
+      for (int j = 0; j < fs.length; j++)
+        fs[j] = fr[j];
+      points.add(fs);
+    }
+    return points;
+  }
+
+  private List<Text> getFormattedPoints(List<Float[]> points) {
+    List<Text> result = new ArrayList<Text>();
+    for (Float[] point : points)
+      result.add(new Text(Canopy.formatPoint(point)));
+    return result;
+  }
+
+  /**
+   * Verify that the given canopies are equivalent to the referenceManhattan
+   * 
+   * @param canopies
+   */
+  private void verifyManhattanCanopies(List<Canopy> canopies) {
+    verifyCanopies(canopies, referenceManhattan);
+  }
+
+  /**
+   * Verify that the given canopies are equivalent to the referenceEuclidean
+   * 
+   * @param canopies
+   */
+  private void verifyEuclideanCanopies(List<Canopy> canopies) {
+    verifyCanopies(canopies, referenceEuclidean);
+  }
+
+  /**
+   * Verify that the given canopies are equivalent to the reference. This means
+   * the number of canopies is the same, the number of points in each is the
+   * same and the centroids are the same.
+   * 
+   * @param canopies
+   */
+  private void verifyCanopies(List<Canopy> canopies, List<Canopy> reference) {
+    assertEquals("number of canopies", reference.size(), canopies.size());
+    for (int canopyIx = 0; canopyIx < canopies.size(); canopyIx++) {
+      Canopy refCanopy = reference.get(canopyIx);
+      Canopy testCanopy = canopies.get(canopyIx);
+      assertEquals("canopy points " + canopyIx, refCanopy.getNumPoints(),
+          testCanopy.getNumPoints());
+      Float[] refCentroid = refCanopy.computeCentroid();
+      Float[] testCentroid = testCanopy.computeCentroid();
+      for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+        assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+            refCentroid[pointIx], testCentroid[pointIx]);
+      }
+    }
+  }
+
+  /**
+   * Print the canopies to the transcript
+   * 
+   * @param canopies a List<Canopy>
+   */
+  private void prtCanopies(List<Canopy> canopies) {
+    for (Canopy canopy : canopies) {
+      System.out.println(canopy.toString());
+    }
+  }
+
+  private void writePointsToFile(List<Float[]> points, String fileName)
+      throws IOException {
+    writePointsToFileWithPayload(points, fileName, "");
+  }
+
+  private void writePointsToFileWithPayload(List<Float[]> points,
+      String fileName, String payload) throws IOException {
+    BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
+    for (Float[] point : points) {
+      output.write(Canopy.formatPoint(point));
+      output.write(payload);
+      output.write("\n");
+    }
+    output.flush();
+    output.close();
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    referenceManhattan = populateCanopies(manhattanDistanceMeasure,
+        getPoints(raw), (float) 3.1, (float) 2.1);
+    manhattanCentroids = populateCentroids(referenceManhattan);
+    referenceEuclidean = populateCanopies(euclideanDistanceMeasure,
+        getPoints(raw), (float) 3.1, (float) 2.1);
+    euclideanCentroids = populateCentroids(referenceEuclidean);
+  }
+
+  /**
+   * Iterate through the canopies, adding their centroids to a list
+   * 
+   * @param canopies a List<Canopy>
+   * @return the List<Float[]>
+   */
+  List<Float[]> populateCentroids(List<Canopy> canopies) {
+    List<Float[]> result = new ArrayList<Float[]>();
+    for (Canopy canopy : canopies)
+      result.add(canopy.computeCentroid());
+    return result;
+  }
+
+  /**
+   * Iterate through the points, adding new canopies. Return the canopies.
+   * 
+   * @param measure a DistanceMeasure to use
+   * @param points a list<Float[]> defining the points to be clustered
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   * @return the List<Canopy> created
+   */
+  List<Canopy> populateCanopies(DistanceMeasure measure, List<Float[]> points,
+      float t1, float t2) {
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    Canopy.config(measure, t1, t2);
+    /**
+     * Reference Implementation: Given a distance metric, one can create
+     * canopies as follows: Start with a list of the data points in any order,
+     * and with two distance thresholds, T1 and T2, where T1 > T2. (These
+     * thresholds can be set by the user, or selected by cross-validation.) Pick
+     * a point on the list and measure its distance to all other points. Put all
+     * points that are within distance threshold T1 into a canopy. Remove from
+     * the list all points that are within distance threshold T2. Repeat until
+     * the list is empty.
+     */
+    while (!points.isEmpty()) {
+      Iterator<Float[]> ptIter = points.iterator();
+      Float[] p1 = ptIter.next();
+      ptIter.remove();
+      Canopy canopy = new VisibleCanopy(p1);
+      canopies.add(canopy);
+      while (ptIter.hasNext()) {
+        Float[] p2 = ptIter.next();
+        float dist = measure.distance(p1, p2);
+        // Put all points that are within distance threshold T1 into the canopy
+        if (dist < t1)
+          canopy.addPoint(p2);
+        // Remove from the list all points that are within distance threshold T2
+        if (dist < t2)
+          ptIter.remove();
+      }
+    }
+    return canopies;
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /**
+   * Story: User can cluster points using a ManhattanDistanceMeasure and a
+   * reference implementation
+   * 
+   * @throws Exception
+   */
+  public void testReferenceManhattan() throws Exception {
+    System.out.println("testReferenceManhattan");
+    // see setUp for cluster creation
+    prtCanopies(referenceManhattan);
+    assertEquals("number of canopies", 3, referenceManhattan.size());
+    for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+      Canopy testCanopy = referenceManhattan.get(canopyIx);
+      int[] expectedNumPoints = { 4, 4, 3 };
+      float[][] expectedCentroids = { { (float) 1.5, (float) 1.5 },
+          { (float) 4.0, (float) 4.0 },
+          { (float) 4.6666665, (float) 4.6666665 } };
+      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
+          testCanopy.getNumPoints());
+      float[] refCentroid = expectedCentroids[canopyIx];
+      Float[] testCentroid = testCanopy.computeCentroid();
+      for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+        assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+            refCentroid[pointIx], testCentroid[pointIx]);
+      }
+    }
+  }
+
+  /**
+   * Story: User can cluster points using a EuclideanDistanceMeasure and a
+   * reference implementation
+   * 
+   * @throws Exception
+   */
+  public void testReferenceEuclidean() throws Exception {
+    System.out.println("testReferenceEuclidean()");
+    // see setUp for cluster creation
+    prtCanopies(referenceEuclidean);
+    assertEquals("number of canopies", 3, referenceManhattan.size());
+    for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+      Canopy testCanopy = referenceEuclidean.get(canopyIx);
+      int[] expectedNumPoints = { 5, 5, 3 };
+      float[][] expectedCentroids = { { (float) 1.8, (float) 1.8 },
+          { (float) 4.2, (float) 4.2 },
+          { (float) 4.6666665, (float) 4.6666665 } };
+      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
+          testCanopy.getNumPoints());
+      float[] refCentroid = expectedCentroids[canopyIx];
+      Float[] testCentroid = testCanopy.computeCentroid();
+      for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+        assertEquals("canopy centroid " + canopyIx + "[" + pointIx + "]",
+            refCentroid[pointIx], testCentroid[pointIx]);
+      }
+    }
+  }
+
+  /**
+   * Story: User can cluster points without instantiating them all in memory at
+   * once
+   * 
+   * @throws Exception
+   */
+  public void testIterativeManhattan() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    Canopy.config(new ManhattanDistanceMeasure(), (float) 3.1, (float) 2.1);
+
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    for (Float[] point : points)
+      Canopy.addPointToCanopies(point, canopies);
+
+    System.out.println("testIterativeManhattan");
+    prtCanopies(canopies);
+    verifyManhattanCanopies(canopies);
+  }
+
+  /**
+   * Story: User can cluster points without instantiating them all in memory at
+   * once
+   * 
+   * @throws Exception
+   */
+  public void testIterativeEuclidean() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    Canopy.config(new EuclideanDistanceMeasure(), (float) 3.1, (float) 2.1);
+
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    for (Float[] point : points)
+      Canopy.addPointToCanopies(point, canopies);
+
+    System.out.println("testIterativeEuclidean");
+    prtCanopies(canopies);
+    verifyEuclideanCanopies(canopies);
+  }
+
+  /**
+   * Story: User can produce initial canopy centers using a
+   * ManhattanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+   * points to produce an output set of canopy centroid points.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyMapperManhattan() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    CanopyCombiner combiner = new CanopyCombiner();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    assertEquals("Number of map results", 3, collector.getData().size());
+    // now combine the mapper output
+    Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    Map<String, List<Writable>> mapData = collector.getData();
+    collector = new DummyOutputCollector();
+    for (String key : mapData.keySet())
+      combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+          null);
+    // now verify the output
+    List<Writable> data = collector.getValue("centroid");
+    assertEquals("Number of centroids", 3, data.size());
+    for (int i = 0; i < data.size(); i++)
+      assertEquals("Centroid error", Canopy.formatPoint(manhattanCentroids
+          .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(i)
+          .toString())));
+  }
+
+  /**
+   * Story: User can produce initial canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+   * points to produce an output set of canopy centroid points.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyMapperEuclidean() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    CanopyCombiner combiner = new CanopyCombiner();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    assertEquals("Number of map results", 3, collector.getData().size());
+    // now combine the mapper output
+    Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    Map<String, List<Writable>> mapData = collector.getData();
+    collector = new DummyOutputCollector();
+    for (String key : mapData.keySet())
+      combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+          null);
+    // now verify the output
+    List<Writable> data = collector.getValue("centroid");
+    assertEquals("Number of centroids", 3, data.size());
+    for (int i = 0; i < data.size(); i++)
+      assertEquals("Centroid error", Canopy.formatPoint(euclideanCentroids
+          .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(i)
+          .toString())));
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a
+   * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid
+   * points to produce an output set of final canopy centroid points.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyReducerManhattan() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    List<Float[]> points = getPoints(raw);
+    List<Text> texts = getFormattedPoints(points);
+    reducer.reduce(new Text("centroid"), texts.iterator(), collector, null);
+    reducer.close();
+    Set<String> keys = collector.getKeys();
+    assertEquals("Number of centroids", 3, keys.size());
+    int i = 0;
+    for (String key : keys) {
+      List<Writable> data = collector.getValue(key);
+      assertEquals("Centroid error", Canopy.formatPoint(manhattanCentroids
+          .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(0)
+          .toString())));
+      i++;
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
+   * points to produce an output set of final canopy centroid points.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyReducerEuclidean() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    List<Float[]> points = getPoints(raw);
+    List<Text> texts = getFormattedPoints(points);
+    reducer.reduce(new Text("centroid"), texts.iterator(), collector, null);
+    reducer.close();
+    Set<String> keys = collector.getKeys();
+    assertEquals("Number of centroids", 3, keys.size());
+    int i = 0;
+    for (String key : keys) {
+      List<Writable> data = collector.getValue(key);
+      assertEquals("Centroid error", Canopy.formatPoint(euclideanCentroids
+          .get(i)), Canopy.formatPoint(Canopy.decodePoint(data.get(0)
+          .toString())));
+      i++;
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job
+   * and a ManhattanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyGenManhattanMR() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFile(points, "testdata/file1");
+    writePointsToFile(points, "testdata/file2");
+    // now run the Canopy Driver
+    CanopyDriver.runJob("testdata", "output/canopies",
+        ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+    // verify output from sequence file
+    JobConf job = new JobConf(
+        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+    FileSystem fs = FileSystem.get(job);
+    Path path = new Path("output/canopies/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+    Text key = new Text();
+    Text value = new Text();
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("1st key", "C0", key.toString());
+    assertEquals("1st value", "[1.5, 1.5, ] ", value.toString());
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("2nd key", "C1", key.toString());
+    assertEquals("2nd value", "[4.333333, 4.333333, ] ", value.toString());
+    assertFalse("more to come", reader.next(key, value));
+    reader.close();
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job
+   * and a EuclideanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyGenEuclideanMR() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFile(points, "testdata/file1");
+    writePointsToFile(points, "testdata/file2");
+    // now run the Canopy Driver
+    CanopyDriver.runJob("testdata", "output/canopies",
+        EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+    // verify output from sequence file
+    JobConf job = new JobConf(
+        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+    FileSystem fs = FileSystem.get(job);
+    Path path = new Path("output/canopies/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+    Text key = new Text();
+    Text value = new Text();
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("1st key", "C0", key.toString());
+    assertEquals("1st value", "[1.8, 1.8, ] ", value.toString());
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("2nd key", "C1", key.toString());
+    assertEquals("2nd value", "[4.4333334, 4.4333334, ] ", value.toString());
+    assertFalse("more to come", reader.next(key, value));
+    reader.close();
+  }
+
+  /**
+   * Story: User can cluster a subset of the points using a ClusterMapper and a
+   * ManhattanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusterMapperManhattan() throws Exception {
+    Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    ClusterMapper mapper = new ClusterMapper();
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    for (Float[] centroid : manhattanCentroids)
+      canopies.add(new Canopy(centroid));
+    mapper.config(canopies);
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    Map<String, List<Writable>> data = collector.getData();
+    assertEquals("Number of map results", canopies.size(), data.size());
+    for (String canopyDef : data.keySet()) {
+      Canopy canopy = Canopy.decodeCanopy(canopyDef);
+      List<Writable> pts = data.get(canopyDef);
+      for (Writable ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(Canopy
+            .decodePoint(ptDef.toString())));
+    }
+  }
+
+  /**
+   * Story: User can cluster a subset of the points using a ClusterMapper and a
+   * EuclideanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusterMapperEuclidean() throws Exception {
+    Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    ClusterMapper mapper = new ClusterMapper();
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    for (Float[] centroid : euclideanCentroids)
+      canopies.add(new Canopy(centroid));
+    mapper.config(canopies);
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    Map<String, List<Writable>> data = collector.getData();
+    assertEquals("Number of map results", canopies.size(), data.size());
+    for (String canopyDef : data.keySet()) {
+      Canopy canopy = Canopy.decodeCanopy(canopyDef);
+      List<Writable> pts = data.get(canopyDef);
+      for (Writable ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(Canopy
+            .decodePoint(ptDef.toString())));
+    }
+  }
+
+  /**
+   * Story: User can cluster a subset of the points using a ClusterReducer and a
+   * ManhattanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusterReducerManhattan() throws Exception {
+    Canopy.config(manhattanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    ClusterMapper mapper = new ClusterMapper();
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    for (Float[] centroid : manhattanCentroids)
+      canopies.add(new Canopy(centroid));
+    mapper.config(canopies);
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    Map<String, List<Writable>> data = collector.getData();
+    assertEquals("Number of map results", canopies.size(), data.size());
+
+    // reduce the data
+    Reducer reducer = new IdentityReducer();
+    collector = new DummyOutputCollector();
+    for (String key : data.keySet())
+      reducer.reduce(new Text(key), data.get(key).iterator(), collector, null);
+
+    // check the output
+    data = collector.getData();
+    for (String canopyDef : data.keySet()) {
+      Canopy canopy = Canopy.decodeCanopy(canopyDef);
+      List<Writable> pts = data.get(canopyDef);
+      for (Writable ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(Canopy
+            .decodePoint(ptDef.toString())));
+    }
+  }
+
+  /**
+   * Story: User can cluster a subset of the points using a ClusterReducer and a
+   * EuclideanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusterReducerEuclidean() throws Exception {
+    Canopy.config(euclideanDistanceMeasure, ((float) 3.1), ((float) 2.1));
+    ClusterMapper mapper = new ClusterMapper();
+    List<Canopy> canopies = new ArrayList<Canopy>();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    for (Float[] centroid : euclideanCentroids)
+      canopies.add(new Canopy(centroid));
+    mapper.config(canopies);
+    List<Float[]> points = getPoints(raw);
+    // map the data
+    for (Float[] point : points)
+      mapper.map(new Text(), new Text(Canopy.formatPoint(point)), collector,
+          null);
+    Map<String, List<Writable>> data = collector.getData();
+
+    // reduce the data
+    Reducer reducer = new IdentityReducer();
+    collector = new DummyOutputCollector();
+    for (String key : data.keySet())
+      reducer.reduce(new Text(key), data.get(key).iterator(), collector, null);
+
+    // check the output
+    data = collector.getData();
+    assertEquals("Number of map results", canopies.size(), data.size());
+    for (String canopyDef : data.keySet()) {
+      Canopy canopy = Canopy.decodeCanopy(canopyDef);
+      List<Writable> pts = data.get(canopyDef);
+      for (Writable ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(Canopy
+            .decodePoint(ptDef.toString())));
+    }
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a ManhattanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusteringManhattanMR() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFile(points, "testdata/file1");
+    writePointsToFile(points, "testdata/file2");
+    // now run the Job
+    CanopyClusteringJob.runJob("testdata", "output",
+        ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+    BufferedReader reader = new BufferedReader(new FileReader(
+        "output/clusters/part-00000"));
+    int count = 0;
+    while (reader.ready()) {
+      System.out.println(reader.readLine());
+      count++;
+    }
+    // the point [3.0,3.0] is covered by both canopies
+    assertEquals("number of points", 2 + 2 * points.size(), count);
+    reader.close();
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testClusteringEuclideanMR() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFile(points, "testdata/file1");
+    writePointsToFile(points, "testdata/file2");
+    // now run the Job
+    CanopyClusteringJob.runJob("testdata", "output",
+        EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+    BufferedReader reader = new BufferedReader(new FileReader(
+        "output/clusters/part-00000"));
+    int count = 0;
+    while (reader.ready()) {
+      System.out.println(reader.readLine());
+      count++;
+    }
+    // the point [3.0,3.0] is covered by both canopies
+    assertEquals("number of points", 2 + 2 * points.size(), count);
+    reader.close();
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a ManhattanDistanceMeasure. Input points can have extra payload
+   * information following the point [...] and this information will be retained
+   * in the output.
+   * 
+   * @throws Exception
+   */
+  public void testClusteringManhattanMRWithPayload() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFileWithPayload(points, "testdata/file1", "file1");
+    writePointsToFileWithPayload(points, "testdata/file2", "file2");
+    // now run the Job
+    CanopyClusteringJob.runJob("testdata", "output",
+        ManhattanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+    BufferedReader reader = new BufferedReader(new FileReader(
+        "output/clusters/part-00000"));
+    int count = 0;
+    while (reader.ready()) {
+      String line = reader.readLine();
+      assertTrue("No payload", line.indexOf("file") > 0);
+      System.out.println(line);
+      count++;
+    }
+    // the point [3.0,3.0] is covered by both canopies
+    assertEquals("number of points", 2 + 2 * points.size(), count);
+    reader.close();
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure. Input points can have extra payload
+   * information following the point [...] and this information will be retained
+   * in the output.
+   * 
+   * @throws Exception
+   */
+  public void testClusteringEuclideanMRWithPayload() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFileWithPayload(points, "testdata/file1", "file1");
+    writePointsToFileWithPayload(points, "testdata/file2", "file2");
+    // now run the Job
+    CanopyClusteringJob.runJob("testdata", "output",
+        EuclideanDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+    BufferedReader reader = new BufferedReader(new FileReader(
+        "output/clusters/part-00000"));
+    int count = 0;
+    while (reader.ready()) {
+      String line = reader.readLine();
+      assertTrue("No payload", line.indexOf("file") > 0);
+      System.out.println(line);
+      count++;
+    }
+    // the point [3.0,3.0] is covered by both canopies
+    assertEquals("number of points", 2 + 2 * points.size(), count);
+    reader.close();
+  }
+
+  /**
+   * Story: Clustering algorithm must support arbitrary user defined distance
+   * measure
+   * 
+   * @throws Exception
+   */
+  public void testUserDefinedDistanceMeasure() throws Exception {
+    List<Float[]> points = getPoints(raw);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFile(points, "testdata/file1");
+    writePointsToFile(points, "testdata/file2");
+    // now run the Canopy Driver. User defined measure happens to be a Manhattan
+    // subclass so results are same.
+    CanopyDriver.runJob("testdata", "output/canopies",
+        UserDefinedDistanceMeasure.class.getName(), (float) 3.1, (float) 2.1, "dist/apache-mahout-0.1-dev.jar");
+
+    // verify output from sequence file
+    JobConf job = new JobConf(
+        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+    FileSystem fs = FileSystem.get(job);
+    Path path = new Path("output/canopies/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+    Text key = new Text();
+    Text value = new Text();
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("1st key", "C0", key.toString());
+    assertEquals("1st value", "[1.5, 1.5, ] ", value.toString());
+    assertTrue("more to come", reader.next(key, value));
+    assertEquals("2nd key", "C1", key.toString());
+    assertEquals("2nd value", "[4.333333, 4.333333, ] ", value.toString());
+    assertFalse("more to come", reader.next(key, value));
+    reader.close();
+  }
+}

Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,22 @@
+package org.apache.mahout.clustering.canopy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class UserDefinedDistanceMeasure extends ManhattanDistanceMeasure {
+
+}

Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/UserDefinedDistanceMeasure.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java?rev=629348&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java Tue Feb 19 20:27:57 2008
@@ -0,0 +1,59 @@
+package org.apache.mahout.clustering.canopy;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This Canopy subclass maintains a list of points in the canopy so it can
+ * include them in its toString method. Useful for debugging but not practical
+ * for production use since it holds onto all its points.
+ * 
+ */
+public class VisibleCanopy extends Canopy {
+  private List<Float[]> points = new ArrayList<Float[]>();
+
+  public VisibleCanopy(Float[] point) {
+    super(point);
+    points.add(point);
+  }
+
+  /**
+   * Add a point to the canopy
+   * 
+   * @param point a Float[]
+   */
+  public void addPoint(Float[] point) {
+    super.addPoint(point);
+    points.add(point);
+  }
+
+  /**
+   * Return a printable representation of this object, using the user supplied
+   * identifier
+   * 
+   * @return
+   */
+  public String toString() {
+    String out = super.toString() + ": ";
+    for (Float[] pt : points)
+      out = ptOut(out, pt);
+    return out;
+  }
+
+}

Propchange: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
------------------------------------------------------------------------------
    svn:eol-style = native