You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2008/04/15 03:16:55 UTC

svn commit: r648085 - in /lucene/mahout/trunk/src: main/java/org/apache/mahout/clustering/meanshift/ main/java/org/apache/mahout/matrix/ main/java/org/apache/mahout/utils/ test/java/org/apache/mahout/clustering/meanshift/

Author: jeastman
Date: Mon Apr 14 18:16:52 2008
New Revision: 648085

URL: http://svn.apache.org/viewvc?rev=648085&view=rev
Log:
Initial commit of MAHOUT-15 introduces a variant of the Canopy cluster that uses an iterative approach to shift and merge canopies that are created for each original input point. As canopies shift and merge, they converge upon the modes of the input point density function and define the resulting clusters.  

- MeanShiftCanopy.java: Defined by a center and a list of bound points. During each iteration, the canopies within their T1 radius are used to determine a weighted centroid which becomes the next canopy center. Canopies moving within their T2 radius are merged and the bound points are accumulated.
- MeanShiftCanopyMapper.java: The mapper reads input canopies and outputs information to the combiner that is used to shift and merge its canopies.
- MeanShiftCanopyCombiner.java: The combiner accumulates information about the mapper's canopies and outputs them in their new centroid positions.
- MeanShiftCanopyReducer.java: The reducer merges the outputs of multiple mapper/combiners and performs another shift merge iteration.
- MeanShiftCanopyDriver.java: Is the driver for an iteration of the algorithm
- MeanShiftCanopyJob.java: Controls the iterations until all canopies have converged
- DenseVector.java
   - toString(): produces a string representation of the receiver
   - asWritableComparable: uses toString()
- DistanceMeasure.java
   - distance: Float[] argument method is deprecated
   - distance: new method with Vector arguments
- EuclideanDistanceMeasure: Implements new distance method
- ManhattanDistanceMeasure: Implements new distance method
- DummyOutputCollector.java: Used for unit tests
- TestMeanShift.java: Unit tests of the algorithm   

Added:
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyCombiner.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyReducer.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/DummyOutputCollector.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
Modified:
    lucene/mahout/trunk/src/main/java/org/apache/mahout/matrix/DenseVector.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/EuclideanDistanceMeasure.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/ManhattanDistanceMeasure.java

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,457 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.DenseVector;
+import org.apache.mahout.matrix.PlusFunction;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.EuclideanDistanceMeasure;
+
+/**
+ * This class models a canopy as a center point, the number of points that are
+ * contained within it according to the application of some distance metric, and
+ * a point total which is the sum of all the points and is used to compute the
+ * centroid when needed.
+ */
+public class MeanShiftCanopy {
+
+  // keys used by Driver, Mapper, Combiner & Reducer
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+
+  public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
+  public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+  public static final String CANOPY_PATH_KEY = "org.apache.mahout.clustering.canopy.path";
+
+  public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.canopy.convergence";
+
+  private static double convergenceDelta = 0;
+
+  // the next canopyId to be allocated
+  private static int nextCanopyId = 0;
+
+  // the T1 distance threshold
+  private static double t1;
+
+  // the T2 distance threshold
+  private static double t2;
+
+  // the distance measure
+  private static DistanceMeasure measure;
+
+  // this canopy's canopyId
+  private int canopyId;
+
+  // the current center
+  private Vector center = null;
+
+  // the number of points in the canopy
+  private int numPoints = 0;
+
+  // the total of all points added to the canopy
+  private Vector pointTotal = null;
+
+  private List<Vector> boundPoints = new ArrayList<Vector>();
+
+  private boolean converged = false;
+
+  /**
+   * Configure the Canopy and its distance measure
+   * 
+   * @param job the JobConf for this job
+   */
+  public static void configure(JobConf job) {
+    try {
+      Class cl = Class.forName(job.get(DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      measure.configure(job);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    nextCanopyId = 0;
+    t1 = new Double(job.get(T1_KEY));
+    t2 = new Double(job.get(T2_KEY));
+    convergenceDelta = new Double(job.get(CLUSTER_CONVERGENCE_KEY));
+  }
+
+  /**
+   * Configure the Canopy for unit tests
+   * 
+   * @param aMeasure
+   * @param aT1
+   * @param aT2
+   * @param convergenceDelta the convergence criteria
+   */
+  public static void config(DistanceMeasure aMeasure, double aT1, double aT2,
+      double aDelta) {
+    nextCanopyId = 100; // so canopyIds will sort properly
+    measure = aMeasure;
+    t1 = aT1;
+    t2 = aT2;
+    convergenceDelta = aDelta;
+  }
+
+  /**
+   * Merge the given canopy into the canopies list. If it touches any existing
+   * canopy (norm<T1) then add the center of each to the other. If it covers
+   * any other canopies (norm<T2), then merge the given canopy with the closest
+   * covering canopy. If the given canopy does not cover any other canopies, add
+   * it to the canopies list.
+   * 
+   * @param aCanopy a MeanShiftCanopy to be merged
+   * @param canopies the List<Canopy> to be appended
+   */
+  public static void mergeCanopy(MeanShiftCanopy aCanopy,
+      List<MeanShiftCanopy> canopies) throws CardinalityException {
+    MeanShiftCanopy closestCoveringCanopy = null;
+    double closestNorm = Float.MAX_VALUE;
+    for (MeanShiftCanopy canopy : canopies) {
+      double norm = measure.distance(canopy.getCenter(), aCanopy.getCenter());
+      if (norm < t1)
+        aCanopy.touch(canopy);
+      if (norm < t2)
+        if (closestCoveringCanopy == null || norm < closestNorm) {
+          closestNorm = norm;
+          closestCoveringCanopy = canopy;
+        }
+    }
+    if (closestCoveringCanopy == null)
+      canopies.add(aCanopy);
+    else
+      closestCoveringCanopy.merge(aCanopy);
+  }
+
+  /**
+   * This method is used by the CanopyMapper to perform canopy inclusion tests
+   * and to emit the point and its covering canopies to the output. The
+   * CanopyCombiner will then sum the canopy points and produce the centroids.
+   * 
+   * @param aCanopy a MeanShiftCanopy to be merged
+   * @param canopies the List<Canopy> to be appended
+   * @param collector an OutputCollector in which to emit the point
+   */
+  public static void mergeCanopy(MeanShiftCanopy aCanopy,
+      List<MeanShiftCanopy> canopies,
+      OutputCollector<Text, WritableComparable> collector) throws IOException,
+      CardinalityException {
+    MeanShiftCanopy closestCoveringCanopy = null;
+    double closestNorm = 0;
+    for (MeanShiftCanopy canopy : canopies) {
+      double dist = measure.distance(canopy.getCenter(), aCanopy.getCenter());
+      if (dist < t1)
+        aCanopy.touch(collector, canopy);
+      if (dist < t2)
+        if (closestCoveringCanopy == null || dist < closestNorm) {
+          closestCoveringCanopy = canopy;
+          closestNorm = dist;
+        }
+    }
+    if (closestCoveringCanopy == null) {
+      canopies.add(aCanopy);
+      aCanopy.emitCanopy(aCanopy, collector);
+    } else
+      closestCoveringCanopy.merge(aCanopy, collector);
+  }
+
+  /**
+   * Format the canopy for output
+   * 
+   * @param canopy
+   * @return
+   */
+  public static String formatCanopy(MeanShiftCanopy canopy) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(canopy.getIdentifier()).append(" - ").append(
+        canopy.getCenter().asWritableComparable().toString()).append(": ");
+    for (Vector bound : canopy.boundPoints)
+      builder.append(bound.asWritableComparable().toString());
+    return builder.toString();
+  }
+
+  /**
+   * Decodes and returns a Canopy from the formattedString
+   * 
+   * @param formattedString a String produced by formatCanopy
+   * @return a new Canopy
+   */
+  public static MeanShiftCanopy decodeCanopy(String formattedString) {
+    int beginIndex = formattedString.indexOf('[');
+    int endIndex = formattedString.indexOf(':', beginIndex);
+    String id = formattedString.substring(0, beginIndex);
+    String centroid = formattedString.substring(beginIndex, endIndex);
+    String boundPoints = formattedString.substring(endIndex + 1).trim();
+    boolean startsWithV = id.startsWith("V");
+    if (id.startsWith("C") || startsWithV) {
+      int canopyId = new Integer(formattedString.substring(1, beginIndex - 3));
+      Vector canopyCentroid = DenseVector.decodeFormat(new Text(centroid));
+      List<Vector> canopyBoundPoints = new ArrayList<Vector>();
+      while (boundPoints.length() > 0) {
+        int ix = boundPoints.indexOf(']');
+        Vector v = DenseVector.decodeFormat(new Text(boundPoints.substring(0,
+            ix + 1)));
+        canopyBoundPoints.add(v);
+        boundPoints = boundPoints.substring(ix + 1);
+      }
+      return new MeanShiftCanopy(canopyCentroid, canopyId, canopyBoundPoints,
+          startsWithV);
+    }
+    return null;
+  }
+
+  /**
+   * Create a new Canopy with the given canopyId
+   * 
+   * @param point a Vector
+   */
+  public MeanShiftCanopy(String id) {
+    super();
+    this.canopyId = new Integer(id.substring(1));
+    this.center = null;
+    this.pointTotal = null;
+    this.numPoints = 0;
+  }
+
+  /**
+   * Create a new Canopy containing the given point
+   * 
+   * @param point a Vector
+   */
+  public MeanShiftCanopy(Vector point) {
+    super();
+    this.canopyId = nextCanopyId++;
+    this.center = point;
+    this.pointTotal = point.copy();
+    this.numPoints = 1;
+    this.boundPoints.add(point);
+  }
+
+  /**
+   * Create a new Canopy containing the given point, canopyId and bound points
+   * 
+   * @param point a Vector
+   * @param canopyId an int identifying the canopy local to this process only
+   * @param boundPoints a List<Vector> containing points bound to the canopy
+   * @param converged true if the canopy has converged
+   */
+  MeanShiftCanopy(Vector point, int canopyId, List<Vector> boundPoints,
+      boolean converged) {
+    super();
+    this.canopyId = canopyId;
+    this.center = point;
+    this.pointTotal = point.copy();
+    this.numPoints = 1;
+    this.boundPoints = boundPoints;
+    this.converged = converged;
+  }
+
+  /**
+   * Add a point to the canopy some number of times
+   * 
+   * @param point a Vector to add
+   * @param nPoints the number of times to add the point
+   * @throws CardinalityException if the cardinalities disagree
+   */
+  void addPoints(Vector point, int nPoints) throws CardinalityException {
+    numPoints += nPoints;
+    Vector subTotal = (nPoints == 1) ? point.copy() : point.times(nPoints);
+    pointTotal = (pointTotal == null) ? subTotal : pointTotal.plus(subTotal);
+  }
+
+  /**
+   * Return if the point is closely covered by this canopy
+   * 
+   * @param point a Vector point
+   * @return if the point is covered
+   */
+  public boolean closelyBound(Vector point) throws CardinalityException {
+    return measure.distance(center, point) < t2;
+  }
+
+  /**
+   * Compute the bound centroid by averaging the bound points
+   * 
+   * @return a Vector which is the new bound centroid
+   */
+  public Vector computeBoundCentroid() throws CardinalityException {
+    Vector result = new DenseVector(center.cardinality());
+    for (Vector v : boundPoints)
+      result.assign(v, new PlusFunction());
+    return result.divide(boundPoints.size());
+  }
+
+  /**
+   * Compute the centroid by normalizing the pointTotal
+   * 
+   * @return a Vector which is the new centroid
+   */
+  public Vector computeCentroid() {
+    if (numPoints == 0)
+      return center;
+    else
+      return pointTotal.divide(numPoints);
+  }
+
+  /**
+   * Return if the point is covered by this canopy
+   * 
+   * @param point a Vector point
+   * @return if the point is covered
+   */
+  boolean covers(Vector point) throws CardinalityException {
+    return measure.distance(center, point) < t1;
+  }
+
+  /**
+   * Emit the new canopy to the collector, keyed by the canopy's Id
+   * 
+   * @param point a Vector
+   */
+  void emitCanopy(MeanShiftCanopy canopy,
+      OutputCollector<Text, WritableComparable> collector) throws IOException {
+    String identifier = this.getIdentifier();
+    collector.collect(new Text(identifier),
+        new Text("new " + canopy.toString()));
+  }
+
+  /**
+   * Emit the canopy centroid to the collector, keyed by the canopy's Id, once
+   * per bound point.
+   * 
+   * @param canopy a MeanShiftCanopy
+   * @param collector the OutputCollector
+   * @throws IOException if there is an IO problem with the collector
+   */
+  void emitCanopyCentroid(MeanShiftCanopy canopy,
+      OutputCollector<Text, WritableComparable> collector) throws IOException {
+    collector.collect(new Text(this.getIdentifier()), new Text(canopy
+        .computeCentroid().asWritableComparable().toString()
+        + boundPoints.size()));
+  }
+
+  public List<Vector> getBoundPoints() {
+    return boundPoints;
+  }
+
+  public int getCanopyId() {
+    return canopyId;
+  }
+
+  /**
+   * Return the center point
+   * 
+   * @return a Vector
+   */
+  public Vector getCenter() {
+    return center;
+  }
+
+  public String getIdentifier() {
+    return converged ? "V" + canopyId : "C" + canopyId;
+  }
+
+  /**
+   * Return the number of points under the Canopy
+   * 
+   * @return
+   */
+  public int getNumPoints() {
+    return numPoints;
+  }
+
+  void init(MeanShiftCanopy canopy) throws CardinalityException {
+    canopyId = canopy.canopyId;
+    center = canopy.center;
+    addPoints(center, 1);
+    boundPoints.addAll(canopy.getBoundPoints());
+  }
+
+  public boolean isConverged() {
+    return converged;
+  }
+
+  /**
+   * The receiver overlaps the given canopy. Touch it and add my bound points to
+   * it.
+   * 
+   * @param canopy an existing MeanShiftCanopy
+   */
+  void merge(MeanShiftCanopy canopy) {
+    boundPoints.addAll(canopy.boundPoints);
+  }
+
+  /**
+   * The receiver overlaps the given canopy. Touch it and add my bound points to
+   * it.
+   * 
+   * @param canopy an existing MeanShiftCanopy
+   */
+  void merge(MeanShiftCanopy canopy,
+      OutputCollector<Text, WritableComparable> collector) throws IOException {
+    collector.collect(new Text(getIdentifier()), new Text("merge "
+        + canopy.toString()));
+  }
+
+  public boolean shiftToMean() throws CardinalityException {
+    Vector centroid = computeCentroid();
+    converged = new EuclideanDistanceMeasure().distance(centroid, center) < convergenceDelta;
+    center = centroid;
+    numPoints = 1;
+    pointTotal = centroid.copy();
+    return converged;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#toString()
+   */
+  public String toString() {
+    return formatCanopy(this);
+  }
+
+  /**
+   * The receiver touches the given canopy. Add respective centers.
+   * 
+   * @param canopy an existing MeanShiftCanopy
+   */
+  void touch(MeanShiftCanopy canopy) throws CardinalityException {
+    canopy.addPoints(getCenter(), boundPoints.size());
+    addPoints(canopy.center, canopy.boundPoints.size());
+  }
+
+  /**
+   * The receiver touches the given canopy. Emit the respective centers.
+   * 
+   * @param collector
+   * @param canopy
+   * @throws IOException
+   */
+  void touch(OutputCollector<Text, WritableComparable> collector,
+      MeanShiftCanopy canopy) throws IOException {
+    canopy.emitCanopyCentroid(this, collector);
+    emitCanopyCentroid(canopy, collector);
+  }
+}

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyCombiner.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyCombiner.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyCombiner.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,87 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.DenseVector;
+import org.apache.mahout.matrix.Vector;
+
+public class MeanShiftCanopyCombiner extends MapReduceBase implements
+    Reducer<Text, WritableComparable, Text, WritableComparable> {
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable,
+   *      java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
+   *      org.apache.hadoop.mapred.Reporter)
+   */
+  public void reduce(Text key, Iterator<WritableComparable> values,
+      OutputCollector<Text, WritableComparable> output, Reporter reporter)
+      throws IOException {
+    MeanShiftCanopy canopy = new MeanShiftCanopy(key.toString());
+    try {
+      while (values.hasNext()) {
+        Writable value = (Writable) values.next();
+        String valueStr = value.toString();
+        if (valueStr.startsWith("new"))
+          canopy.init(MeanShiftCanopy.decodeCanopy(valueStr.substring(4)));
+        else if (valueStr.startsWith("merge"))
+          canopy.merge(MeanShiftCanopy.decodeCanopy(valueStr.substring(6)));
+        else {
+          Vector formatString = DenseVector.decodeFormat(new Text(valueStr));
+          int number = new Integer(valueStr
+              .substring(valueStr.indexOf(']') + 2));
+          canopy.addPoints(formatString, number);
+        }
+      }
+      // Combiner may see situations where a canopy touched others in the mapper
+      // before it was merged. This causes points to be added to it in the
+      // combiner, but since the canopy was merged it has no center. Ignore
+      // these cases.
+      if (canopy.getCenter() != null) {
+        canopy.shiftToMean();
+        output.collect(new Text("canopy"), new Text(MeanShiftCanopy
+            .formatCanopy(canopy)));
+      }
+    } catch (CardinalityException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    MeanShiftCanopy.configure(job);
+  }
+}

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class MeanShiftCanopyDriver {
+
+  public static void main(String[] args) {
+    String input = args[0];
+    String output = args[1];
+    String measureClassName = args[2];
+    double t1 = new Double(args[3]);
+    double t2 = new Double(args[4]);
+    double convergenceDelta = new Double(args[5]);
+    runJob(input, output, measureClassName, t1, t2, convergenceDelta, false);
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param input the input pathname String
+   * @param output the output pathname String
+   * @param measureClassName the DistanceMeasure class name
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   * @param convergenceDelta the double convergence criteria
+   * @param inputIsSequenceFile true if input is sequence file encoded
+   */
+  public static void runJob(String input, String output,
+      String measureClassName, double t1, double t2, double convergenceDelta,
+      boolean inputIsSequenceFile) {
+
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(MeanShiftCanopyDriver.class);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setInputPath(new Path(input));
+    Path outPath = new Path(output);
+    conf.setOutputPath(outPath);
+
+    conf.setMapperClass(MeanShiftCanopyMapper.class);
+    conf.setCombinerClass(MeanShiftCanopyCombiner.class);
+    conf.setReducerClass(MeanShiftCanopyReducer.class);
+    conf.setNumReduceTasks(1);
+    if (inputIsSequenceFile)
+      conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.set(MeanShiftCanopy.DISTANCE_MEASURE_KEY, measureClassName);
+    conf.set(MeanShiftCanopy.CLUSTER_CONVERGENCE_KEY, "" + convergenceDelta);
+    conf.set(MeanShiftCanopy.T1_KEY, "" + t1);
+    conf.set(MeanShiftCanopy.T2_KEY, "" + t2);
+
+    client.setConf(conf);
+    try {
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyJob.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+
+public class MeanShiftCanopyJob {
+
+  public static void main(String[] args) {
+    String input = args[0];
+    String output = args[1];
+    String measureClassName = args[2];
+    double t1 = new Double(args[3]);
+    double t2 = new Double(args[4]);
+    double convergenceDelta = new Double(args[5]);
+    int maxIterations = new Integer(args[6]);
+    runJob(input, output, measureClassName, t1, t2, convergenceDelta,
+        maxIterations);
+  }
+
+  /**
+   * Run the job
+   * 
+   * @param input the input pathname String
+   * @param output the output pathname String
+   * @param measureClassName the DistanceMeasure class name
+   * @param t1 the T1 distance threshold
+   * @param t2 the T2 distance threshold
+   * @param convergenceDelta the double convergence criteria
+   * @param maxIterations an int number of iterations
+   */
+  public static void runJob(String input, String output,
+      String measureClassName, double t1, double t2, double convergenceDelta,
+      int maxIterations) {
+    int maxIter = new Integer(maxIterations);
+    try {
+      // delete the output directory
+      JobConf conf = new JobConf(MeanShiftCanopyDriver.class);
+      Path outPath = new Path(output);
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outPath)) {
+        fs.delete(outPath);
+      }
+      fs.mkdirs(outPath);
+      // iterate until the clusters converge
+      boolean converged = false;
+      boolean inputIsSequenceFile = false;
+      int iteration = 0;
+      String clustersIn = input;
+      while (!converged && iteration < maxIter) {
+        System.out.println("Iteration " + iteration);
+        // point the output to a new directory per iteration
+        String clustersOut = output + "/canopies-" + iteration;
+        MeanShiftCanopyDriver.runJob(clustersIn, clustersOut, measureClassName,
+            t1, t2, convergenceDelta, inputIsSequenceFile);
+        converged = isConverged(clustersOut + "/part-00000", conf, FileSystem
+            .get(conf));
+        // now point the input to the old output directory
+        clustersIn = output + "/canopies-" + iteration;
+        iteration++;
+        inputIsSequenceFile = true;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Return if all of the Clusters in the filePath have converged or not
+   * 
+   * @param filePath the file path to the single file containing the canopies
+   * @param conf the JobConf
+   * @param fs the FileSystem
+   * @return true if all canopies are converged
+   * @throws IOException if there was an IO error
+   */
+  static boolean isConverged(String filePath, JobConf conf, FileSystem fs)
+      throws IOException {
+    boolean converged;
+    Path outPart = new Path(filePath);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
+    Text key = new Text();
+    Text value = new Text();
+    converged = true;
+    while (converged && reader.next(key, value))
+      converged = converged && value.toString().startsWith("V");
+    return converged;
+  }
+
+}

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,67 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.CardinalityException;
+
+public class MeanShiftCanopyMapper extends MapReduceBase implements
+    Mapper<WritableComparable, Text, Text, WritableComparable> {
+
+  List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Mapper#map(org.apache.hadoop.io.WritableComparable,
+   *      org.apache.hadoop.io.Writable,
+   *      org.apache.hadoop.mapred.OutputCollector,
+   *      org.apache.hadoop.mapred.Reporter)
+   */
+  public void map(WritableComparable key, Text values,
+      OutputCollector<Text, WritableComparable> output, Reporter reporter)
+      throws IOException {
+    MeanShiftCanopy canopy = MeanShiftCanopy.decodeCanopy(values.toString());
+    try {
+      MeanShiftCanopy.mergeCanopy(canopy, canopies, output);
+    } catch (CardinalityException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    MeanShiftCanopy.configure(job);
+  }
+
+}

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyReducer.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyReducer.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyReducer.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,79 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.CardinalityException;
+
+public class MeanShiftCanopyReducer extends MapReduceBase implements
+    Reducer<Text, WritableComparable, Text, WritableComparable> {
+
+  List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
+
+  OutputCollector<Text, WritableComparable> collector;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable,
+   *      java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
+   *      org.apache.hadoop.mapred.Reporter)
+   */
+  public void reduce(Text key, Iterator<WritableComparable> values,
+      OutputCollector<Text, WritableComparable> output, Reporter reporter)
+      throws IOException {
+    collector = output;
+    try {
+      while (values.hasNext()) {
+        Text value = (Text) values.next();
+        MeanShiftCanopy canopy = MeanShiftCanopy.decodeCanopy(value.toString());
+        MeanShiftCanopy.mergeCanopy(canopy, canopies);
+      }
+
+      for (MeanShiftCanopy canopy : canopies) {
+        canopy.shiftToMean();
+        collector.collect(new Text(canopy.getIdentifier()), new Text(
+            MeanShiftCanopy.formatCanopy(canopy)));
+      }
+    } catch (CardinalityException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    MeanShiftCanopy.configure(job);
+  }
+
+}

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/matrix/DenseVector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/matrix/DenseVector.java?rev=648085&r1=648084&r2=648085&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/matrix/DenseVector.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/matrix/DenseVector.java Mon Apr 14 18:16:52 2008
@@ -66,12 +66,17 @@
 
   @Override
   public WritableComparable asWritableComparable() {
+    return new Text(toString());
+  }
+
+  @Override
+  public String toString() {
     StringBuilder out = new StringBuilder();
     out.append("[, ");
     for (int i = 0; i < values.length; i++)
       out.append(values[i]).append(", ");
     out.append("] ");
-    return new Text(out.toString());
+    return out.toString();
   }
 
   @Override

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java?rev=648085&r1=648084&r2=648085&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java Mon Apr 14 18:16:52 2008
@@ -17,6 +17,8 @@
 package org.apache.mahout.utils;
 
 import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.Vector;
 
 /**
  * This interface is used for objects which can determine a distance metric
@@ -26,11 +28,22 @@
 
   /**
    * Returns the distance metric applied to the arguments
-   *
+   * 
    * @param p1 a Float[] defining a multidimensional point in some feature space
    * @param p2 a Float[] defining a multidimensional point in some feature space
    * @return a scalar float of the distance
+   * @deprecated in favor of the Vector method
    */
   public float distance(Float[] p1, Float[] p2);
+
+  /**
+   * Returns the distance metric applied to the arguments
+   * 
+   * @param p1 a Vector defining a multidimensional point in some feature space
+   * @param p2 a Vector defining a multidimensional point in some feature space
+   * @return a scalar doubles of the distance
+   * @throws CardinalityException
+   */
+  public double distance(Vector v1, Vector v2) throws CardinalityException;
 
 }

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/EuclideanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/EuclideanDistanceMeasure.java?rev=648085&r1=648084&r2=648085&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/EuclideanDistanceMeasure.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/EuclideanDistanceMeasure.java Mon Apr 14 18:16:52 2008
@@ -17,6 +17,8 @@
 package org.apache.mahout.utils;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.Vector;
 
 /**
  * This class implements a Euclidian distance metric by summing the square root
@@ -24,15 +26,20 @@
  */
 public class EuclideanDistanceMeasure implements DistanceMeasure {
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
    */
   public void configure(JobConf job) {
     // nothing to do
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[], java.lang.Float[])
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.mahout.clustering.canopy.DistanceMeasure#distance(java.lang.Float[],
+   *      java.lang.Float[])
    */
   public float distance(Float[] p1, Float[] p2) {
     float result = 0;
@@ -41,6 +48,23 @@
       result += delta * delta;
     }
     return (float) Math.sqrt(result);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.mahout.utils.DistanceMeasure#distance(org.apache.mahout.matrix.Vector,
+   *      org.apache.mahout.matrix.Vector)
+   */
+  public double distance(Vector v1, Vector v2) throws CardinalityException {
+    if (v1.cardinality() != v2.cardinality())
+      throw new CardinalityException();
+    double result = 0;
+    for (int i = 0; i < v1.cardinality(); i++) {
+      double delta = v2.getQuick(i) - v1.getQuick(i);
+      result += delta * delta;
+    }
+    return Math.sqrt(result);
   }
 
 }

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/ManhattanDistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/ManhattanDistanceMeasure.java?rev=648085&r1=648084&r2=648085&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/ManhattanDistanceMeasure.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/ManhattanDistanceMeasure.java Mon Apr 14 18:16:52 2008
@@ -17,6 +17,8 @@
 package org.apache.mahout.utils;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.Vector;
 
 /**
  * This class implements a "manhattan distance" metric by summing the absolute
@@ -37,11 +39,28 @@
     return result;
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
    */
   public void configure(JobConf job) {
     // nothing to do
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.mahout.utils.DistanceMeasure#distance(org.apache.mahout.matrix.Vector,
+   *      org.apache.mahout.matrix.Vector)
+   */
+ public double distance(Vector v1, Vector v2) throws CardinalityException {
+    if (v1.cardinality() != v2.cardinality())
+      throw new CardinalityException();
+    double result = 0;
+    for (int i = 0; i < v1.cardinality(); i++)
+      result += Math.abs(v2.getQuick(i) - v1.getQuick(i));
+    return result;
   }
 
 }

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/DummyOutputCollector.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/DummyOutputCollector.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/DummyOutputCollector.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,62 @@
+package org.apache.mahout.clustering.meanshift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * A dummy OutputCollector that is parameterized for mean shift canopy testing
+ *
+ */
+public class DummyOutputCollector implements
+    OutputCollector<Text, WritableComparable> {
+
+  Map<String, List<WritableComparable>> data = new TreeMap<String, List<WritableComparable>>();
+
+  public void collect(Text key, WritableComparable values) throws IOException {
+    String keyStr = key.toString();
+    List<WritableComparable> points = data.get(keyStr);
+    if (points == null) {
+      points = new ArrayList<WritableComparable>();
+      data.put(keyStr, points);
+    }
+    points.add(values);
+  }
+
+  public Map<String, List<WritableComparable>> getData() {
+    return data;
+  }
+
+  public List<WritableComparable> getValue(String key) {
+    return data.get(key);
+  }
+
+  public Set<String> getKeys() {
+    return data.keySet();
+  }
+
+}

Added: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java?rev=648085&view=auto
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java (added)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java Mon Apr 14 18:16:52 2008
@@ -0,0 +1,347 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.meanshift;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.matrix.CardinalityException;
+import org.apache.mahout.matrix.DenseVector;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.EuclideanDistanceMeasure;
+import org.apache.mahout.utils.ManhattanDistanceMeasure;
+
+public class TestMeanShift extends TestCase {
+
+  Vector[] raw = null;
+
+  DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+
+  DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
+  public TestMeanShift(String name) {
+    super(name);
+  }
+
+  /**
+   * Print the canopies to the transcript
+   * 
+   * @param canopies a List<Canopy>
+   */
+  private void prtCanopies(List<MeanShiftCanopy> canopies) {
+    for (MeanShiftCanopy canopy : canopies) {
+      System.out.println(canopy.toString());
+    }
+  }
+
+  /** 
+   * Write the given points to the file within an enclosing MeanShiftCanopy
+   * @param points a Vector[] of points
+   * @param fileName the String file name
+   * @param payload a String payload that goes with each point.
+   * TODO: handle payloads associated with points. Currently they are ignored
+   * @throws IOException
+   */
+  private void writePointsToFileWithPayload(Vector[] points, String fileName,
+      String payload) throws IOException {
+    BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
+    for (Vector point : points) {
+      output.write(new MeanShiftCanopy(point).toString());
+      output.write(payload);
+      output.write("\n");
+    }
+    output.flush();
+    output.close();
+  }
+
+  /**
+   * Recursively remove the contents of a directory
+   * 
+   * @param path
+   * @throws Exception
+   */
+  private void rmr(String path) throws Exception {
+    File f = new File(path);
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        String[] contents = f.list();
+        for (int i = 0; i < contents.length; i++)
+          rmr(f.toString() + File.separator + contents[i]);
+      }
+      f.delete();
+    }
+  }
+
+  /**
+   * Print a graphical representation of the clustered image points as a 10x10
+   * character mask
+   * 
+   * @param canopies
+   */
+  private void printImage(List<MeanShiftCanopy> canopies) {
+    char[][] out = new char[10][10];
+    for (int i = 0; i < out.length; i++)
+      for (int j = 0; j < out[0].length; j++)
+        out[i][j] = ' ';
+    for (MeanShiftCanopy canopy : canopies) {
+      int ch = 'A' + canopy.getCanopyId() - 100;
+      for (Vector pt : canopy.getBoundPoints())
+        out[(int) pt.getQuick(0)][(int) pt.getQuick(1)] = (char) ch;
+    }
+    for (int i = 0; i < out.length; i++)
+      System.out.println(out[i]);
+  }
+
+  private List<MeanShiftCanopy> getInitialCanopies() {
+    List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
+    for (int i = 0; i < raw.length; i++)
+      canopies.add(new MeanShiftCanopy(raw[i]));
+    return canopies;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see junit.framework.TestCase#setUp()
+   */
+  protected void setUp() throws Exception {
+    super.setUp();
+    rmr("output");
+    rmr("testdata");
+    raw = new Vector[100];
+    for (int i = 0; i < 10; i++)
+      for (int j = 0; j < 10; j++) {
+        int ix = i * 10 + j;
+        Vector v = new DenseVector(3);
+        v.setQuick(0, i);
+        v.setQuick(1, j);
+        if (i == j)
+          v.setQuick(2, 9);
+        else if (i + j == 9)
+          v.setQuick(2, 4.5);
+        raw[ix] = v;
+      }
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /**
+   * Story: User can exercise the reference implementation to verify that the
+   * test datapoints are clustered in a reasonable manner.
+   * 
+   * @throws CardinalityException
+   */
+  public void testReferenceImplementation() throws CardinalityException {
+    MeanShiftCanopy.config(new EuclideanDistanceMeasure(), (float) 4, 1.0, 0.5);
+    List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
+    // add all points to the canopies
+    for (int i = 0; i < raw.length; i++)
+      MeanShiftCanopy.mergeCanopy(new MeanShiftCanopy(raw[i]), canopies);
+    boolean done = false;
+    int iter = 1;
+    while (!done) {// shift canopies to their centroids
+      done = true;
+      List<MeanShiftCanopy> migratedCanopies = new ArrayList<MeanShiftCanopy>();
+      for (MeanShiftCanopy canopy : canopies) {
+        done = canopy.shiftToMean() && done;
+        MeanShiftCanopy.mergeCanopy(canopy, migratedCanopies);
+      }
+      canopies = migratedCanopies;
+      prtCanopies(canopies);
+      printImage(canopies);
+      System.out.println(iter++);
+    }
+  }
+
+  /**
+   * Story: User can produce initial canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+   * points to produce an output set of canopies.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyMapperEuclidean() throws Exception {
+    MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
+    MeanShiftCanopyCombiner combiner = new MeanShiftCanopyCombiner();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
+    // get the initial canopies
+    List<MeanShiftCanopy> canopies = getInitialCanopies();
+    // build the reference set
+    List<MeanShiftCanopy> refCanopies = new ArrayList<MeanShiftCanopy>();
+    for (int i = 0; i < raw.length; i++)
+      MeanShiftCanopy.mergeCanopy(new MeanShiftCanopy(raw[i]), refCanopies);
+
+    // map the data
+    for (MeanShiftCanopy canopy : canopies)
+      mapper.map(new Text(), new Text(canopy.toString()), collector, null);
+    assertEquals("Number of map results", 100, collector.getData().size());
+    // now combine the mapper output
+    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
+    Map<String, List<WritableComparable>> mapData = collector.getData();
+    collector = new DummyOutputCollector();
+    for (String key : mapData.keySet())
+      combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+          null);
+
+    // now verify the output
+    List<WritableComparable> data = collector.getValue("canopy");
+    assertEquals("Number of canopies", refCanopies.size(), data.size());
+    // add all points to the reference canopies
+    Map<String, MeanShiftCanopy> refCanopyMap = new HashMap<String, MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : refCanopies) {
+      canopy.shiftToMean();
+      refCanopyMap.put(canopy.getIdentifier(), canopy);
+    }
+    // build a map of the combiner output
+    Map<String, MeanShiftCanopy> canopyMap = new HashMap<String, MeanShiftCanopy>();
+    for (WritableComparable d : data) {
+      MeanShiftCanopy dc = MeanShiftCanopy.decodeCanopy(d.toString());
+      canopyMap.put(dc.getIdentifier(), dc);
+    }
+    // compare the maps
+    for (String id : refCanopyMap.keySet()) {
+      MeanShiftCanopy ref = refCanopyMap.get(id);
+
+      MeanShiftCanopy canopy = canopyMap.get((ref.isConverged() ? "V" : "C")
+          + (ref.getCanopyId() - raw.length));
+      assertEquals("ids", ref.getCanopyId(), canopy.getCanopyId() + 100);
+      assertEquals("centers(" + ref.getIdentifier() + ")", ref.getCenter()
+          .asWritableComparable().toString(), canopy.getCenter()
+          .asWritableComparable().toString());
+      assertEquals("bound points", ref.getBoundPoints().size(), canopy
+          .getBoundPoints().size());
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
+   * points to produce an output set of final canopy centroid points.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyReducerEuclidean() throws Exception {
+    MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
+    MeanShiftCanopyCombiner combiner = new MeanShiftCanopyCombiner();
+    MeanShiftCanopyReducer reducer = new MeanShiftCanopyReducer();
+    DummyOutputCollector collector = new DummyOutputCollector();
+    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
+    // get the initial canopies
+    List<MeanShiftCanopy> canopies = getInitialCanopies();
+    // build the reference set
+    List<MeanShiftCanopy> refCanopies = new ArrayList<MeanShiftCanopy>();
+    for (int i = 0; i < raw.length; i++)
+      MeanShiftCanopy.mergeCanopy(new MeanShiftCanopy(raw[i]), refCanopies);
+    List<MeanShiftCanopy> refCanopies2 = new ArrayList<MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : refCanopies)
+      canopy.shiftToMean();
+    for (MeanShiftCanopy canopy : refCanopies)
+      MeanShiftCanopy.mergeCanopy(canopy, refCanopies2);
+    for (MeanShiftCanopy canopy : refCanopies)
+      canopy.shiftToMean();
+
+    // map the data
+    for (MeanShiftCanopy canopy : canopies)
+      mapper.map(new Text(), new Text(canopy.toString()), collector, null);
+    assertEquals("Number of map results", 100, collector.getData().size());
+    // now combine the mapper output
+    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
+    Map<String, List<WritableComparable>> mapData = collector.getData();
+    collector = new DummyOutputCollector();
+    for (String key : mapData.keySet())
+      combiner.reduce(new Text(key), mapData.get(key).iterator(), collector,
+          null);
+    // now reduce the combiner output
+    DummyOutputCollector collector2 = new DummyOutputCollector();
+    reducer.reduce(new Text("canopy"), collector.getValue("canopy").iterator(),
+        collector2, null);
+
+    // now verify the output
+    assertEquals("Number of canopies", refCanopies2.size(), collector2
+        .getKeys().size());
+    // add all points to the reference canopies
+    Map<String, MeanShiftCanopy> refCanopyMap = new HashMap<String, MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : refCanopies2) {
+      refCanopyMap.put(canopy.getIdentifier(), canopy);
+    }
+    // compare the maps
+    for (String id : refCanopyMap.keySet()) {
+      MeanShiftCanopy ref = refCanopyMap.get(id);
+
+      List<WritableComparable> values = collector2
+          .getValue((ref.isConverged() ? "V" : "C")
+              + (ref.getCanopyId() - raw.length));
+      assertEquals("values", 1, values.size());
+      MeanShiftCanopy canopy = MeanShiftCanopy.decodeCanopy(values.get(0)
+          .toString());
+      assertEquals("ids", ref.getCanopyId(), canopy.getCanopyId() + 100);
+      assertEquals("centers(" + id + ")", ref.getCenter()
+          .asWritableComparable().toString(), canopy.getCenter()
+          .asWritableComparable().toString());
+      assertEquals("bound points", ref.getBoundPoints().size(), canopy
+          .getBoundPoints().size());
+    }
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure.
+   * 
+   * @throws Exception
+   */
+  public void testCanopyEuclideanMRJob() throws Exception {
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    writePointsToFileWithPayload(raw, "testdata/file1", "");
+    writePointsToFileWithPayload(raw, "testdata/file2", "");
+    // now run the Job
+    MeanShiftCanopyJob.runJob("testdata", "output",
+        EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10);
+    JobConf conf = new JobConf(MeanShiftCanopyDriver.class);
+    FileSystem fs = FileSystem.get(conf);
+    Path outPart = new Path("output/canopies-2/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
+    Text key = new Text();
+    Text value = new Text();
+    int count = 0;
+    while (reader.next(key, value)) {
+      MeanShiftCanopy.decodeCanopy(value.toString());
+      count++;
+    }
+    reader.close();
+    assertEquals("count", 3, count);
+  }
+}