You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/03/01 04:33:21 UTC

svn commit: r632543 [1/2] - in /lucene/mahout/trunk: ./ src/main/java/org/apache/mahout/clustering/canopy/ src/main/java/org/apache/mahout/clustering/kmeans/ src/main/java/org/apache/mahout/utils/ src/test/java/org/apache/mahout/clustering/canopy/ src/...

Author: gsingers
Date: Fri Feb 29 19:33:13 2008
New Revision: 632543

URL: http://svn.apache.org/viewvc?rev=632543&view=rev
Log:
MAHOUT-5: Add M/R k-Means clustering implementation by Jeff Eastman, and refactor some common code from Canopy clustering

Added:
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java   (with props)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java   (with props)
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/VisibleCluster.java   (with props)
Modified:
    lucene/mahout/trunk/   (props changed)
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
    lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
    lucene/mahout/trunk/src/test/java/org/apache/mahout/utils/UserDefinedDistanceMeasure.java

Propchange: lucene/mahout/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Feb 29 19:33:13 2008
@@ -4,3 +4,4 @@
 build
 output
 testdata
+dist

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Fri Feb 29 19:33:13 2008
@@ -16,21 +16,20 @@
  */
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.List;
 
 /**
  * This class models a canopy as a center point, the number of points that are
  * contained within it according to the application of some distance metric, and
  * a point total which is the sum of all the points and is used to compute the
  * centroid when needed.
- * 
  */
 public class Canopy {
 
@@ -69,7 +68,7 @@
 
   /**
    * Create a new Canopy containing the given point
-   * 
+   *
    * @param point a Float[]
    */
   public Canopy(Float[] point) {
@@ -82,8 +81,8 @@
 
   /**
    * Create a new Canopy containing the given point and canopyId
-   * 
-   * @param point a Float[]
+   *
+   * @param point    a Float[]
    * @param canopyId an int identifying the canopy local to this process only
    */
   public Canopy(Float[] point, int canopyId) {
@@ -96,7 +95,7 @@
 
   /**
    * Configure the Canopy and its distance measure
-   * 
+   *
    * @param job the JobConf for this job
    */
   public static void configure(JobConf job) {
@@ -114,6 +113,7 @@
 
   /**
    * Configure the Canopy for unit tests
+   *
    * @param aMeasure
    * @param aT1
    * @param aT2
@@ -130,10 +130,10 @@
    * existing canopies instead of the points. Because of this it does not need
    * to actually store the points, instead storing a total points vector and the
    * number of points. From this a centroid can be computed.
-   * 
+   * <p/>
    * This method is used by the CanopyReducer.
-   * 
-   * @param point the Float[] defining the point to be added
+   *
+   * @param point    the Float[] defining the point to be added
    * @param canopies the List<Canopy> to be appended
    */
   public static void addPointToCanopies(Float[] point, List<Canopy> canopies) {
@@ -152,13 +152,13 @@
    * This method is used by the CanopyMapper to perform canopy inclusion tests
    * and to emit the point and its covering canopies to the output. The
    * CanopyCombiner will then sum the canopy points and produce the centroids.
-   * 
-   * @param point the Float[] defining the point to be added
-   * @param canopies the List<Canopy> to be appended
+   *
+   * @param point     the Float[] defining the point to be added
+   * @param canopies  the List<Canopy> to be appended
    * @param collector an OutputCollector in which to emit the point
    */
   public static void emitPointToNewCanopies(Float[] point,
-      List<Canopy> canopies, OutputCollector collector) throws IOException {
+                                            List<Canopy> canopies, OutputCollector<Text, Text> collector) throws IOException {
     boolean pointStronglyBound = false;
     for (Canopy canopy : canopies) {
       float dist = measure.distance(canopy.getCenter(), point);
@@ -178,16 +178,16 @@
    * and to emit the point keyed by its covering canopies to the output. if the
    * point is not covered by any canopies (due to canopy centroid clustering),
    * emit the point to the closest covering canopy.
-   * 
-   * @param point the Float[] defining the point to be added
-   * @param canopies the List<Canopy> to be appended
-   * @param writable the original Writable from the input, may include arbitrary
-   *        payload information after the point [...]<payload>
+   *
+   * @param point     the Float[] defining the point to be added
+   * @param canopies  the List<Canopy> to be appended
+   * @param writable  the original Writable from the input, may include arbitrary
+   *                  payload information after the point [...]<payload>
    * @param collector an OutputCollector in which to emit the point
    */
   public static void emitPointToExistingCanopies(Float[] point,
-      List<Canopy> canopies, Writable writable, OutputCollector collector)
-      throws IOException {
+                                                 List<Canopy> canopies, Text writable, OutputCollector<Text, Text> collector)
+          throws IOException {
     float minDist = Float.MAX_VALUE;
     Canopy closest = null;
     boolean isCovered = false;
@@ -208,64 +208,19 @@
   }
 
   /**
-   * Returns a print string for the point
-   * 
-   * @param out a String to append to
-   * @param pt the Float[] point
-   * @return
-   */
-  public static String ptOut(String out, Float[] pt) {
-    out += formatPoint(pt);
-    return out;
-  }
-
-  /**
-   * Format the point for input to a Mapper or Reducer
-   * 
-   * @param point a Float[]
-   * @return a String
-   */
-  public static String formatPoint(Float[] point) {
-    String out = "";
-    out += "[";
-    for (int i = 0; i < point.length; i++)
-      out += point[i] + ", ";
-    out += "] ";
-    String ptOut = out;
-    return ptOut;
-  }
-
-  /**
-   * Decodes a point from its string representation.
-   * 
-   * @param formattedString a comma-terminated String of the form
-   *        "[v1,v2,...,vn,]"
-   * @return the Float[] defining an n-dimensional point
-   */
-  public static Float[] decodePoint(String formattedString) {
-    String[] pts = formattedString.split(",");
-    Float[] point = new Float[pts.length - 1];
-    for (int i = 0; i < point.length; i++)
-      if (pts[i].startsWith("["))
-        point[i] = new Float(pts[i].substring(1));
-      else if (!pts[i].startsWith("]"))
-        point[i] = new Float(pts[i]);
-    return point;
-  }
-
-  /**
    * Format the canopy for output
-   * 
+   *
    * @param canopy
    * @return
    */
   public static String formatCanopy(Canopy canopy) {
-    return "C" + canopy.canopyId + ": " + formatPoint(canopy.computeCentroid());
+    return "C" + canopy.canopyId + ": "
+            + Point.formatPoint(canopy.computeCentroid());
   }
 
   /**
    * Decodes and returns a Canopy from the formattedString
-   * 
+   *
    * @param formattedString a String prouced by formatCanopy
    * @return a new Canopy
    */
@@ -275,7 +230,7 @@
     String centroid = formattedString.substring(beginIndex);
     if (id.startsWith("C")) {
       int canopyId = new Integer(formattedString.substring(1, beginIndex - 2));
-      Float[] canopyCentroid = decodePoint(centroid);
+      Float[] canopyCentroid = Point.decodePoint(centroid);
       return new Canopy(canopyCentroid, canopyId);
     }
     return null;
@@ -283,7 +238,7 @@
 
   /**
    * Add a point to the canopy
-   * 
+   *
    * @param point a Float[]
    */
   public void addPoint(Float[] point) {
@@ -295,22 +250,27 @@
   /**
    * Emit the point to the collector, keyed by the canopy's formatted
    * representation
-   * 
+   *
    * @param point a Float[]
    */
-  public void emitPoint(Float[] point, OutputCollector collector)
-      throws IOException {
-    collector.collect(new Text(formatCanopy(this)), new Text(ptOut("", point)));
+  public void emitPoint(Float[] point, OutputCollector<Text, Text> collector)
+          throws IOException {
+    collector.collect(new Text(formatCanopy(this)), new Text(Point.ptOut("",
+            point)));
   }
 
   /**
    * Return a printable representation of this object, using the user supplied
    * identifier
-   * 
+   *
    * @return
    */
   public String toString() {
-    return "C" + canopyId + " - " + ptOut("", getCenter());
+    return getIdentifier() + " - " + Point.ptOut("", getCenter());
+  }
+
+  public String getIdentifier() {
+    return "C" + canopyId;
   }
 
   public int getCanopyId() {
@@ -319,7 +279,7 @@
 
   /**
    * Return the center point
-   * 
+   *
    * @return a Float[]
    */
   public Float[] getCenter() {
@@ -328,7 +288,7 @@
 
   /**
    * Return the number of points in the Canopy
-   * 
+   *
    * @return
    */
   public int getNumPoints() {
@@ -337,7 +297,7 @@
 
   /**
    * Compute the centroid by averaging the pointTotals
-   * 
+   *
    * @return a Float[] which is the new centroid
    */
   public Float[] computeCentroid() {
@@ -349,7 +309,7 @@
 
   /**
    * Return if the point is covered by this canopy
-   * 
+   *
    * @param point a Float[] point
    * @return if the point is covered
    */

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Fri Feb 29 19:33:13 2008
@@ -7,9 +7,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,7 +30,7 @@
     float t1 = new Float(args[3]);
     float t2 = new Float(args[4]);
     String jarLocation = "apache-mahout-0.1-dev.jar";
-    if (args.length > 5){
+    if (args.length > 5) {
       jarLocation = args[5];
     }
     runJob(input, output, measureClassName, t1, t2, jarLocation);
@@ -38,15 +38,15 @@
 
   /**
    * Run the job
-   * 
-   * @param input the input pathname String
-   * @param output the output pathname String
+   *
+   * @param input            the input pathname String
+   * @param output           the output pathname String
    * @param measureClassName the DistanceMeasure class name
-   * @param t1 the T1 distance threshold
-   * @param t2 the T2 distance threshold
+   * @param t1               the T1 distance threshold
+   * @param t2               the T2 distance threshold
    */
   public static void runJob(String input, String output,
-      String measureClassName, float t1, float t2, String jarLocation) {
+                            String measureClassName, float t1, float t2, String jarLocation) {
     CanopyDriver.runJob(input, output + "/canopies", measureClassName, t1, t2, jarLocation);
     ClusterDriver.runJob(input, output + "/canopies", output, measureClassName, t1, t2, jarLocation);
   }

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java Fri Feb 29 19:33:13 2008
@@ -1,11 +1,10 @@
-package org.apache.mahout.clustering.canopy;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,32 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.mahout.clustering.canopy;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
 
 import java.io.IOException;
 import java.util.Iterator;
 
-public class CanopyCombiner extends MapReduceBase implements Reducer {
+public class CanopyCombiner extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
     Writable value = (Writable) values.next();
-    Float[] center = Canopy.decodePoint(value.toString());
+    Float[] center = Point.decodePoint(value.toString());
     Canopy canopy = new Canopy(center);
     while (values.hasNext()) {
       value = (Writable) values.next();
-      Float[] point = Canopy.decodePoint(value.toString());
+      Float[] point = Point.decodePoint(value.toString());
       canopy.addPoint(point);
     }
-    output.collect(new Text("centroid"), new Text(Canopy.formatPoint(canopy
+    output.collect(new Text("centroid"), new Text(Point.formatPoint(canopy
             .computeCentroid())));
   }
 

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Fri Feb 29 19:33:13 2008
@@ -32,7 +32,7 @@
     float t1 = new Float(args[3]);
     float t2 = new Float(args[4]);
     String jarLocation = "apache-mahout-0.1-dev.jar";
-    if (args.length > 5){
+    if (args.length > 5) {
       jarLocation = args[5];
     }
     runJob(input, output, measureClassName, t1, t2, jarLocation);
@@ -40,18 +40,18 @@
 
   /**
    * Run the job
-   * 
-   * @param input the input pathname String
-   * @param output the output pathname String
+   *
+   * @param input            the input pathname String
+   * @param output           the output pathname String
    * @param measureClassName the DistanceMeasure class name
-   * @param t1 the T1 distance threshold
-   * @param t2 the T2 distance threshold
+   * @param t1               the T1 distance threshold
+   * @param t2               the T2 distance threshold
    */
   public static void runJob(String input, String output,
-      String measureClassName, float t1, float t2, String jarLocation) {
+                            String measureClassName, float t1, float t2, String jarLocation) {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(
-        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+            org.apache.mahout.clustering.canopy.CanopyDriver.class);
     conf.setJar(jarLocation);
     conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
     conf.set(Canopy.T1_KEY, "" + t1);

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Fri Feb 29 19:33:13 2008
@@ -16,19 +16,21 @@
  */
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
-public class CanopyMapper extends MapReduceBase implements Mapper {
+public class CanopyMapper extends MapReduceBase implements
+        Mapper<WritableComparable, Text, Text, Text> {
 
   List<Canopy> canopies = new ArrayList<Canopy>();
 
@@ -40,9 +42,9 @@
    *      org.apache.hadoop.mapred.OutputCollector,
    *      org.apache.hadoop.mapred.Reporter)
    */
-  public void map(WritableComparable key, Writable values,
-      OutputCollector output, Reporter reporter) throws IOException {
-    Float[] point = Canopy.decodePoint(values.toString());
+  public void map(WritableComparable key, Text values,
+                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Float[] point = Point.decodePoint(values.toString());
     Canopy.emitPointToNewCanopies(point, canopies, output);
   }
 

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Fri Feb 29 19:33:13 2008
@@ -16,20 +16,21 @@
  */
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
-public class CanopyReducer extends MapReduceBase implements Reducer {
+public class CanopyReducer extends MapReduceBase implements
+        Reducer<Text, Text, Text, Text> {
 
   List<Canopy> canopies = new ArrayList<Canopy>();
 
@@ -40,23 +41,18 @@
    *      java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
    *      org.apache.hadoop.mapred.Reporter)
    */
-  public void reduce(WritableComparable key, Iterator values,
-      OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
     while (values.hasNext()) {
       Text value = (Text) values.next();
-      Float[] point = Canopy.decodePoint(value.toString());
+      Float[] point = Point.decodePoint(value.toString());
       Canopy.addPointToCanopies(point, canopies);
     }
     for (Canopy canopy : canopies)
-      output.collect(new Text("C" + canopy.getCanopyId()), new Text(Canopy
-          .formatPoint(canopy.computeCentroid())));
+      output.collect(new Text(canopy.getIdentifier()), new Text(Canopy
+              .formatCanopy(canopy)));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.hadoop.mapred.MapReduceBase#close()
-   */
   /*
    * (non-Javadoc)
    * 

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Fri Feb 29 19:33:13 2008
@@ -33,7 +33,7 @@
     float t1 = new Float(args[4]);
     float t2 = new Float(args[5]);
     String jarLocation = "apache-mahout-0.1-dev.jar";
-    if (args.length > 6){
+    if (args.length > 6) {
       jarLocation = args[6];
     }
     runJob(points, canopies, output, measureClassName, t1, t2, jarLocation);
@@ -41,20 +41,20 @@
 
   /**
    * Run the job
-   * 
-   * @param points the input points directory pathname String
-   * @param canopies the input canopies directory pathname String
-   * @param output the output directory pathname String
+   *
+   * @param points           the input points directory pathname String
+   * @param canopies         the input canopies directory pathname String
+   * @param output           the output directory pathname String
    * @param measureClassName the DistanceMeasure class name
-   * @param t1 the T1 distance threshold
-   * @param t2 the T2 distance threshold
+   * @param t1               the T1 distance threshold
+   * @param t2               the T2 distance threshold
    * @param jarLocation
    */
   public static void runJob(String points, String canopies, String output,
                             String measureClassName, float t1, float t2, String jarLocation) {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(
-        org.apache.mahout.clustering.canopy.ClusterDriver.class);
+            org.apache.mahout.clustering.canopy.ClusterDriver.class);
     conf.setJar(jarLocation);
     conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
     conf.set(Canopy.T1_KEY, "" + t1);

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Fri Feb 29 19:33:13 2008
@@ -16,35 +16,36 @@
  */
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
 
-public class ClusterMapper extends MapReduceBase implements Mapper {
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterMapper extends MapReduceBase implements
+        Mapper<WritableComparable, Text, Text, Text> {
 
   List<Canopy> canopies;
 
-  public void map(WritableComparable key, Writable values,
-      OutputCollector output, Reporter reporter) throws IOException {
-    Float[] point = Canopy.decodePoint(values.toString());
+  public void map(WritableComparable key, Text values,
+                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Float[] point = Point.decodePoint(values.toString());
     Canopy.emitPointToExistingCanopies(point, canopies, values, output);
   }
 
   /**
    * Configure the mapper by providing its canopies. Used by unit tests.
-   * 
+   *
    * @param canopies a List<Canopy>
    */
   public void config(List<Canopy> canopies) {
@@ -72,7 +73,7 @@
         Text key = new Text();
         Text value = new Text();
         while (reader.next(key, value)) {
-          Canopy canopy = new Canopy(Canopy.decodePoint(value.toString()));
+          Canopy canopy = Canopy.decodeCanopy(value.toString());
           canopies.add(canopy);
         }
       } finally {

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,272 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.List;
+
+public class Cluster {
+
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
+  public static final String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
+  public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
+  private static int nextClusterId = 0;
+
+  // this cluster's clusterId
+  private int clusterId;
+
+  // the current center
+  private Float[] center = new Float[0];
+
+  // the current centroid is lazy evaluated and may be null
+  private Float[] centroid = null;
+
+  // the number of points in the cluster
+  private int numPoints = 0;
+
+  // the total of all points added to the cluster
+  private Float[] pointTotal = null;
+
+  // has the centroid converged with the center?
+  private boolean converged = false;
+
+  private static DistanceMeasure measure;
+
+  private static float convergenceDelta = 0;
+
+  /**
+   * Format the cluster for output
+   *
+   * @param cluster the Cluster
+   * @return
+   */
+  public static String formatCluster(Cluster cluster) {
+    return cluster.getIdentifier() + ": "
+            + Point.formatPoint(cluster.computeCentroid());
+  }
+
+  /**
+   * Decodes and returns a Cluster from the formattedString
+   *
+   * @param formattedString a String produced by formatCluster
+   * @return a new Canopy
+   */
+  public static Cluster decodeCluster(String formattedString) {
+    int beginIndex = formattedString.indexOf('[');
+    String id = formattedString.substring(0, beginIndex);
+    String center = formattedString.substring(beginIndex);
+    if (id.startsWith("C") || id.startsWith("V")) {
+      int clusterId = new Integer(formattedString.substring(1, beginIndex - 2));
+      Float[] clusterCenter = Point.decodePoint(center);
+      Cluster cluster = new Cluster(clusterCenter, clusterId);
+      cluster.converged = id.startsWith("V");
+      return cluster;
+    }
+    return null;
+  }
+
+  /**
+   * Configure the distance measure from the job
+   *
+   * @param job the JobConf for the job
+   */
+  public static void configure(JobConf job) {
+    try {
+      Class cl = Class.forName(job.get(DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      measure.configure(job);
+      convergenceDelta = new Float(job.get(CLUSTER_CONVERGENCE_KEY));
+      nextClusterId = 0;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Configure the distance measure directly. Used by unit tests.
+   *
+   * @param aMeasure          the DistanceMeasure
+   * @param aConvergenceDelta the float delta value used to define convergence
+   */
+  public static void config(DistanceMeasure aMeasure, float aConvergenceDelta) {
+    measure = aMeasure;
+    convergenceDelta = aConvergenceDelta;
+    nextClusterId = 0;
+  }
+
+  /**
+   * Emit the point to the nearest cluster center
+   *
+   * @param point    a Float[] representing the point
+   * @param clusters a List<Cluster> to test
+   * @param values   a Writable containing the input point and possible other
+   *                 values of interest (payload)
+   * @param output   the OutputCollector to emit into
+   * @throws IOException
+   */
+  public static void emitPointToNearestCluster(Float[] point,
+                                               List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+          throws IOException {
+    Cluster nearestCluster = null;
+    float nearestDistance = Float.MAX_VALUE;
+    for (Cluster cluster : clusters) {
+      float distance = measure.distance(point, cluster.getCenter());
+      if (nearestCluster == null || distance < nearestDistance) {
+        nearestCluster = cluster;
+        nearestDistance = distance;
+      }
+    }
+    output.collect(new Text(formatCluster(nearestCluster)), values);
+  }
+
+  /**
+   * Compute the centroid by averaging the pointTotals
+   *
+   * @return a Float[] which is the new centroid
+   */
+  private Float[] computeCentroid() {
+    if (numPoints == 0)
+      return pointTotal;
+    else if (centroid == null) {
+      // lazy compute new centroid
+      centroid = new Float[pointTotal.length];
+      for (int i = 0; i < pointTotal.length; i++)
+        centroid[i] = new Float(pointTotal[i] / numPoints);
+    }
+    return centroid;
+  }
+
+  /**
+   * Construct a new cluster with the given point as its center
+   *
+   * @param center a Float[] center point
+   */
+  public Cluster(Float[] center) {
+    super();
+    this.clusterId = nextClusterId++;
+    this.center = center;
+    this.numPoints = 0;
+    this.pointTotal = Point.origin(center.length);
+  }
+
+  /**
+   * Construct a new cluster with the given point as its center
+   *
+   * @param center a Float[] center point
+   */
+  public Cluster(Float[] center, int clusterId) {
+    super();
+    this.clusterId = clusterId;
+    this.center = center;
+    this.numPoints = 0;
+    this.pointTotal = Point.origin(center.length);
+  }
+
+  /**
+   * Return a printable representation of this object, using the user supplied
+   * identifier
+   *
+   * @return
+   */
+  public String toString() {
+    return getIdentifier() + " - " + Point.formatPoint(center);
+  }
+
+  public String getIdentifier() {
+    if (converged)
+      return "V" + clusterId;
+    else
+      return "C" + clusterId;
+  }
+
+  /**
+   * Add the point to the cluster
+   *
+   * @param point a Float[] point to add
+   */
+  public void addPoint(Float[] point) {
+    centroid = null;
+    numPoints++;
+    if (pointTotal == null)
+      pointTotal = point.clone();
+    else
+      for (int i = 0; i < point.length; i++)
+        pointTotal[i] = new Float(point[i] + pointTotal[i]);
+  }
+
+  /**
+   * Add the point to the cluster
+   *
+   * @param count the number of points in the delta
+   * @param delta a Float[] point to add
+   */
+  public void addPoints(int count, Float[] delta) {
+    centroid = null;
+    numPoints += count;
+    if (pointTotal == null)
+      pointTotal = delta.clone();
+    else
+      for (int i = 0; i < delta.length; i++)
+        pointTotal[i] = new Float(delta[i] + pointTotal[i]);
+  }
+
+  public Float[] getCenter() {
+    return center;
+  }
+
+  public int getNumPoints() {
+    return numPoints;
+  }
+
+  /**
+   * Compute the centroid and set the center to it.
+   */
+  public void recomputeCenter() {
+    center = computeCentroid();
+    numPoints = 0;
+    pointTotal = Point.origin(center.length);
+  }
+
+  /**
+   * Return if the cluster is converged by comparing its center and centroid.
+   *
+   * @return if the cluster is converged
+   */
+  public boolean computeConvergence() {
+    Float[] centroid = computeCentroid();
+    converged = measure.distance(centroid, center) <= convergenceDelta;
+    return converged;
+  }
+
+  public Float[] getPointTotal() {
+    return pointTotal;
+  }
+
+  public boolean isConverged() {
+    return converged;
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,49 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class KMeansCombiner extends MapReduceBase implements
+        Reducer<Text, Text, Text, Text> {
+
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Cluster cluster = Cluster.decodeCluster(key.toString());
+    while (values.hasNext()){
+      cluster.addPoint(Point.decodePoint(values.next().toString()));
+    }
+    output.collect(key, new Text(cluster.getNumPoints() + ", "
+            + Point.formatPoint(cluster.getPointTotal())));
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Cluster.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+public class KMeansDriver {
+
+  public static void main(String[] args) {
+    String input = args[0];
+    String clusters = args[1];
+    String output = args[2];
+    String measureClass = args[3];
+    String convergenceDelta = args[4];
+    String maxIterations = args[5];
+    String jarLocation = "dist/apache-mahout-0.1-dev.jar";
+    if (args.length > 6) {
+      jarLocation = args[6];
+    }
+    runJob(input, clusters, output, measureClass, convergenceDelta,
+            maxIterations, jarLocation);
+  }
+
+  /**
+   * Run the job using supplied arguments
+   *
+   * @param input            the directory pathname for input points
+   * @param clustersIn       the directory pathname for initial & computed clusters
+   * @param output           the directory pathname for output points
+   * @param measureClass     the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param maxIterations    the maximum number of iterations
+   * @param jarLocation      The location of the Mahout jar
+   */
+  public static void runJob(String input, String clustersIn, String output,
+                            String measureClass, String convergenceDelta, String maxIterations, String jarLocation) {
+    int maxIter = new Integer(maxIterations);
+    try {
+      // delete the output directory
+      JobConf conf = new JobConf(KMeansDriver.class);
+      Path outPath = new Path(output);
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outPath)) {
+        fs.delete(outPath);
+      }
+      fs.mkdirs(outPath);
+      // iterate until the clusters converge
+      boolean converged = false;
+      int iteration = 0;
+
+      while (!converged && iteration < maxIter) {
+        System.out.println("Iteration " + iteration);
+        // point the output to a new directory per iteration
+        String clustersOut = output + "/clusters-" + iteration;
+        converged = runIteration(input, clustersIn, clustersOut, measureClass,
+                convergenceDelta, jarLocation);
+        // now point the input to the old output directory
+        clustersIn = output + "/clusters-" + iteration;
+        iteration++;
+      }
+      // now actually cluster the points
+      System.out.println("Clustering ");
+      runClustering(input, clustersIn, output + "/points", measureClass,
+              convergenceDelta, jarLocation);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   *
+   * @param input            the directory pathname for input points
+   * @param clustersIn       the directory pathname for iniput clusters
+   * @param clustersOut      the directory pathname for output clusters
+   * @param measureClass     the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param jarLocation      The location of the mahout jar
+   * @return true if the iteration successfully runs
+   */
+  static boolean runIteration(String input, String clustersIn,
+                              String clustersOut, String measureClass, String convergenceDelta, String jarLocation) {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(KMeansDriver.class);
+    conf.setJar(jarLocation);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setInputPath(new Path(input));
+    Path outPath = new Path(clustersOut);
+    conf.setOutputPath(outPath);
+
+    conf.setMapperClass(KMeansMapper.class);
+    conf.setCombinerClass(KMeansCombiner.class);
+    conf.setReducerClass(KMeansReducer.class);
+    conf.setNumReduceTasks(1);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
+    conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+
+    client.setConf(conf);
+    try {
+      JobClient.runJob(conf);
+      FileSystem fs = FileSystem.get(conf);
+      return isConverged(clustersOut + "/part-00000", conf, fs);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return true;
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   *
+   * @param input            the directory pathname for input points
+   * @param clustersIn       the directory pathname for input clusters
+   * @param output           the directory pathname for output points
+   * @param measureClass     the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param jarLocation      The location of the mahout jar
+   */
+  static void runClustering(String input, String clustersIn, String output,
+                            String measureClass, String convergenceDelta, String jarLocation) {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(KMeansDriver.class);
+    conf.setJar(jarLocation);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setInputPath(new Path(input));
+    Path outPath = new Path(output);
+    conf.setOutputPath(outPath);
+
+    conf.setMapperClass(KMeansMapper.class);
+    conf.setNumReduceTasks(0);
+    conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
+    conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+
+    client.setConf(conf);
+    try {
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Return if all of the Clusters in the filePath have converged or not
+   *
+   * @param filePath the file path to the single file containing the clusters
+   * @param conf     the JobConf
+   * @param fs       the FileSystem
+   * @return true if all Clusters are converged
+   * @throws IOException if there was an IO error
+   */
+  static boolean isConverged(String filePath, JobConf conf, FileSystem fs)
+          throws IOException {
+    boolean converged;
+    Path outPart = new Path(filePath);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
+    Text key = new Text();
+    Text value = new Text();
+    converged = true;
+    while (reader.next(key, value)) {
+      Cluster cluster = Cluster.decodeCluster(value.toString());
+      converged = converged && cluster.isConverged();
+    }
+    return converged;
+  }
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KMeansMapper extends MapReduceBase implements
+        Mapper<WritableComparable, Text, Text, Text> {
+
+  List<Cluster> clusters;
+
+  public void map(WritableComparable key, Text values,
+                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Float[] point = Point.decodePoint(values.toString());
+    Cluster.emitPointToNearestCluster(point, clusters, values, output);
+  }
+
+  /**
+   * Configure the mapper by providing its clusters. Used by unit tests.
+   *
+   * @param clusters a List<Cluster>
+   */
+  void config(List<Cluster> clusters) {
+    this.clusters = clusters;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Cluster.configure(job);
+
+    String clusterPath = job.get(Cluster.CLUSTER_PATH_KEY);
+    clusters = new ArrayList<Cluster>();
+
+    try {
+      FileSystem fs = FileSystem.get(job);
+      Path path = new Path(clusterPath + "/part-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+      try {
+        Text key = new Text();
+        Text value = new Text();
+        while (reader.next(key, value)) {
+          Cluster cluster = Cluster.decodeCluster(value.toString());
+          // add the center so the centroid will be correct on output formatting
+          cluster.addPoint(cluster.getCenter());
+          clusters.add(cluster);
+        }
+      } finally {
+        reader.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class KMeansReducer extends MapReduceBase implements
+        Reducer<Text, Text, Text, Text> {
+
+  float delta = 0;
+
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Cluster cluster = Cluster.decodeCluster(key.toString());
+    while (values.hasNext()) {
+      String value = values.next().toString();
+      int ix = value.indexOf(',');
+      int count = new Integer(value.substring(0, ix));
+      Float[] total = Point.decodePoint(value.substring(ix + 2));
+      cluster.addPoints(count, total);
+    }
+    // force convergence calculation
+    cluster.computeConvergence();
+    output.collect(new Text(cluster.getIdentifier()), new Text(Cluster
+            .formatCluster(cluster)));
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    Cluster.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java Fri Feb 29 19:33:13 2008
@@ -26,11 +26,11 @@
 
   /**
    * Returns the distance metric applied to the arguments
-   * 
+   *
    * @param p1 a Float[] defining a multidimensional point in some feature space
    * @param p2 a Float[] defining a multidimensional point in some feature space
    * @return a scalar float of the distance
    */
   public float distance(Float[] p1, Float[] p2);
-  
+
 }

Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,116 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.utils;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+
+public class Point {
+
+  /**
+   * Format the point for input to a Mapper or Reducer
+   *
+   * @param point a Float[]
+   * @return a String
+   */
+  public static String formatPoint(Float[] point) {
+    String out = "";
+    out += "[";
+    for (int i = 0; i < point.length; i++)
+      out += point[i] + ", ";
+    out += "] ";
+    String ptOut = out;
+    return ptOut;
+  }
+
+  /**
+   * Decodes a point from its string representation.
+   *
+   * @param formattedString a comma-terminated String of the form
+   *                        "[v1,v2,...,vn,]"
+   * @return the Float[] defining an n-dimensional point
+   */
+  public static Float[] decodePoint(String formattedString) {
+    String[] pts = formattedString.split(",");
+    Float[] point = new Float[pts.length - 1];
+    for (int i = 0; i < point.length; i++)
+      if (pts[i].startsWith("["))
+        point[i] = new Float(pts[i].substring(1));
+      else if (!pts[i].startsWith("]"))
+        point[i] = new Float(pts[i]);
+    return point;
+  }
+
+  /**
+   * Returns a print string for the point
+   *
+   * @param out a String to append to
+   * @param pt  the Float[] point
+   * @return
+   */
+  public static String ptOut(String out, Float[] pt) {
+    out += formatPoint(pt);
+    return out;
+  }
+
+  /**
+   * Return a point with length dimensions and zero values
+   *
+   * @param length
+   * @return a Float[] representing [0,0,0,...,0]
+   */
+  public static Float[] origin(int length) {
+    Float[] result = new Float[length];
+    for (int i = 0; i < length; i++)
+      result[i] = new Float(0);
+    return result;
+  }
+
+  /**
+   * Return the sum of the two points
+   *
+   * @param pt1 a Float[] point
+   * @param pt2 a Float[] point
+   * @return
+   */
+  public static Float[] sum(Float[] pt1, Float[] pt2) {
+    Float[] result = pt1.clone();
+    for (int i = 0; i < pt1.length; i++)
+      result[i] += pt2[i];
+    return result;
+  }
+
+  public static void writePointsToFile(List<Float[]> points, String fileName)
+          throws IOException {
+    writePointsToFileWithPayload(points, fileName, "");
+  }
+
+  public static void writePointsToFileWithPayload(List<Float[]> points,
+                                                  String fileName, String payload) throws IOException {
+    BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
+    for (Float[] point : points) {
+      output.write(org.apache.mahout.utils.Point.formatPoint(point));
+      output.write(payload);
+      output.write("\n");
+    }
+    output.flush();
+    output.close();
+  }
+
+}
\ No newline at end of file

Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java (original)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java Fri Feb 29 19:33:13 2008
@@ -16,6 +16,9 @@
  * limitations under the License.
  */
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -23,32 +26,28 @@
 import java.util.Set;
 import java.util.TreeMap;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.OutputCollector;
+public class DummyOutputCollector implements OutputCollector<Text, Text> {
 
-public class DummyOutputCollector implements OutputCollector {
+  Map<String, List<Text>> data = new TreeMap<String, List<Text>>();
 
-  Map<String, List<Writable>> data = new TreeMap<String, List<Writable>>();
-
-  public void collect(WritableComparable key, Writable values)
-      throws IOException {
-    List<Writable> points = data.get(key.toString());
+  public void collect(Text key, Text values)
+          throws IOException {
+    List<Text> points = data.get(key.toString());
     if (points == null) {
-      points = new ArrayList<Writable>();
+      points = new ArrayList<Text>();
       data.put(key.toString(), points);
     }
     points.add(values);
   }
 
-  public Map<String, List<Writable>> getData() {
+  public Map<String, List<Text>> getData() {
     return data;
   }
 
-  public List<Writable> getValue(String key) {
+  public List<Text> getValue(String key) {
     return data.get(key);
   }
-  
+
   public Set<String> getKeys() {
     return data.keySet();
   }