You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/03/01 04:33:21 UTC
svn commit: r632543 [1/2] - in /lucene/mahout/trunk: ./
src/main/java/org/apache/mahout/clustering/canopy/
src/main/java/org/apache/mahout/clustering/kmeans/
src/main/java/org/apache/mahout/utils/
src/test/java/org/apache/mahout/clustering/canopy/ src/...
Author: gsingers
Date: Fri Feb 29 19:33:13 2008
New Revision: 632543
URL: http://svn.apache.org/viewvc?rev=632543&view=rev
Log:
MAHOUT-5: Add M/R k-Means clustering implementation by Jeff Eastman, and refactor some common code from Canopy clustering
Added:
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java (with props)
lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (with props)
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/kmeans/VisibleCluster.java (with props)
Modified:
lucene/mahout/trunk/ (props changed)
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/VisibleCanopy.java
lucene/mahout/trunk/src/test/java/org/apache/mahout/utils/UserDefinedDistanceMeasure.java
Propchange: lucene/mahout/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Feb 29 19:33:13 2008
@@ -4,3 +4,4 @@
build
output
testdata
+dist
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Fri Feb 29 19:33:13 2008
@@ -16,21 +16,20 @@
*/
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
-import java.util.List;
-
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.List;
/**
* This class models a canopy as a center point, the number of points that are
* contained within it according to the application of some distance metric, and
* a point total which is the sum of all the points and is used to compute the
* centroid when needed.
- *
*/
public class Canopy {
@@ -69,7 +68,7 @@
/**
* Create a new Canopy containing the given point
- *
+ *
* @param point a Float[]
*/
public Canopy(Float[] point) {
@@ -82,8 +81,8 @@
/**
* Create a new Canopy containing the given point and canopyId
- *
- * @param point a Float[]
+ *
+ * @param point a Float[]
* @param canopyId an int identifying the canopy local to this process only
*/
public Canopy(Float[] point, int canopyId) {
@@ -96,7 +95,7 @@
/**
* Configure the Canopy and its distance measure
- *
+ *
* @param job the JobConf for this job
*/
public static void configure(JobConf job) {
@@ -114,6 +113,7 @@
/**
* Configure the Canopy for unit tests
+ *
* @param aMeasure
* @param aT1
* @param aT2
@@ -130,10 +130,10 @@
* existing canopies instead of the points. Because of this it does not need
* to actually store the points, instead storing a total points vector and the
* number of points. From this a centroid can be computed.
- *
+ * <p/>
* This method is used by the CanopyReducer.
- *
- * @param point the Float[] defining the point to be added
+ *
+ * @param point the Float[] defining the point to be added
* @param canopies the List<Canopy> to be appended
*/
public static void addPointToCanopies(Float[] point, List<Canopy> canopies) {
@@ -152,13 +152,13 @@
* This method is used by the CanopyMapper to perform canopy inclusion tests
* and to emit the point and its covering canopies to the output. The
* CanopyCombiner will then sum the canopy points and produce the centroids.
- *
- * @param point the Float[] defining the point to be added
- * @param canopies the List<Canopy> to be appended
+ *
+ * @param point the Float[] defining the point to be added
+ * @param canopies the List<Canopy> to be appended
* @param collector an OutputCollector in which to emit the point
*/
public static void emitPointToNewCanopies(Float[] point,
- List<Canopy> canopies, OutputCollector collector) throws IOException {
+ List<Canopy> canopies, OutputCollector<Text, Text> collector) throws IOException {
boolean pointStronglyBound = false;
for (Canopy canopy : canopies) {
float dist = measure.distance(canopy.getCenter(), point);
@@ -178,16 +178,16 @@
* and to emit the point keyed by its covering canopies to the output. if the
* point is not covered by any canopies (due to canopy centroid clustering),
* emit the point to the closest covering canopy.
- *
- * @param point the Float[] defining the point to be added
- * @param canopies the List<Canopy> to be appended
- * @param writable the original Writable from the input, may include arbitrary
- * payload information after the point [...]<payload>
+ *
+ * @param point the Float[] defining the point to be added
+ * @param canopies the List<Canopy> to be appended
+ * @param writable the original Writable from the input, may include arbitrary
+ * payload information after the point [...]<payload>
* @param collector an OutputCollector in which to emit the point
*/
public static void emitPointToExistingCanopies(Float[] point,
- List<Canopy> canopies, Writable writable, OutputCollector collector)
- throws IOException {
+ List<Canopy> canopies, Text writable, OutputCollector<Text, Text> collector)
+ throws IOException {
float minDist = Float.MAX_VALUE;
Canopy closest = null;
boolean isCovered = false;
@@ -208,64 +208,19 @@
}
/**
- * Returns a print string for the point
- *
- * @param out a String to append to
- * @param pt the Float[] point
- * @return
- */
- public static String ptOut(String out, Float[] pt) {
- out += formatPoint(pt);
- return out;
- }
-
- /**
- * Format the point for input to a Mapper or Reducer
- *
- * @param point a Float[]
- * @return a String
- */
- public static String formatPoint(Float[] point) {
- String out = "";
- out += "[";
- for (int i = 0; i < point.length; i++)
- out += point[i] + ", ";
- out += "] ";
- String ptOut = out;
- return ptOut;
- }
-
- /**
- * Decodes a point from its string representation.
- *
- * @param formattedString a comma-terminated String of the form
- * "[v1,v2,...,vn,]"
- * @return the Float[] defining an n-dimensional point
- */
- public static Float[] decodePoint(String formattedString) {
- String[] pts = formattedString.split(",");
- Float[] point = new Float[pts.length - 1];
- for (int i = 0; i < point.length; i++)
- if (pts[i].startsWith("["))
- point[i] = new Float(pts[i].substring(1));
- else if (!pts[i].startsWith("]"))
- point[i] = new Float(pts[i]);
- return point;
- }
-
- /**
* Format the canopy for output
- *
+ *
* @param canopy
* @return
*/
public static String formatCanopy(Canopy canopy) {
- return "C" + canopy.canopyId + ": " + formatPoint(canopy.computeCentroid());
+ return "C" + canopy.canopyId + ": "
+ + Point.formatPoint(canopy.computeCentroid());
}
/**
* Decodes and returns a Canopy from the formattedString
- *
+ *
* @param formattedString a String prouced by formatCanopy
* @return a new Canopy
*/
@@ -275,7 +230,7 @@
String centroid = formattedString.substring(beginIndex);
if (id.startsWith("C")) {
int canopyId = new Integer(formattedString.substring(1, beginIndex - 2));
- Float[] canopyCentroid = decodePoint(centroid);
+ Float[] canopyCentroid = Point.decodePoint(centroid);
return new Canopy(canopyCentroid, canopyId);
}
return null;
@@ -283,7 +238,7 @@
/**
* Add a point to the canopy
- *
+ *
* @param point a Float[]
*/
public void addPoint(Float[] point) {
@@ -295,22 +250,27 @@
/**
* Emit the point to the collector, keyed by the canopy's formatted
* representation
- *
+ *
* @param point a Float[]
*/
- public void emitPoint(Float[] point, OutputCollector collector)
- throws IOException {
- collector.collect(new Text(formatCanopy(this)), new Text(ptOut("", point)));
+ public void emitPoint(Float[] point, OutputCollector<Text, Text> collector)
+ throws IOException {
+ collector.collect(new Text(formatCanopy(this)), new Text(Point.ptOut("",
+ point)));
}
/**
* Return a printable representation of this object, using the user supplied
* identifier
- *
+ *
* @return
*/
public String toString() {
- return "C" + canopyId + " - " + ptOut("", getCenter());
+ return getIdentifier() + " - " + Point.ptOut("", getCenter());
+ }
+
+ public String getIdentifier() {
+ return "C" + canopyId;
}
public int getCanopyId() {
@@ -319,7 +279,7 @@
/**
* Return the center point
- *
+ *
* @return a Float[]
*/
public Float[] getCenter() {
@@ -328,7 +288,7 @@
/**
* Return the number of points in the Canopy
- *
+ *
* @return
*/
public int getNumPoints() {
@@ -337,7 +297,7 @@
/**
* Compute the centroid by averaging the pointTotals
- *
+ *
* @return a Float[] which is the new centroid
*/
public Float[] computeCentroid() {
@@ -349,7 +309,7 @@
/**
* Return if the point is covered by this canopy
- *
+ *
* @param point a Float[] point
* @return if the point is covered
*/
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Fri Feb 29 19:33:13 2008
@@ -7,9 +7,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,7 +30,7 @@
float t1 = new Float(args[3]);
float t2 = new Float(args[4]);
String jarLocation = "apache-mahout-0.1-dev.jar";
- if (args.length > 5){
+ if (args.length > 5) {
jarLocation = args[5];
}
runJob(input, output, measureClassName, t1, t2, jarLocation);
@@ -38,15 +38,15 @@
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
+ *
+ * @param input the input pathname String
+ * @param output the output pathname String
* @param measureClassName the DistanceMeasure class name
- * @param t1 the T1 distance threshold
- * @param t2 the T2 distance threshold
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
*/
public static void runJob(String input, String output,
- String measureClassName, float t1, float t2, String jarLocation) {
+ String measureClassName, float t1, float t2, String jarLocation) {
CanopyDriver.runJob(input, output + "/canopies", measureClassName, t1, t2, jarLocation);
ClusterDriver.runJob(input, output + "/canopies", output, measureClassName, t1, t2, jarLocation);
}
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyCombiner.java Fri Feb 29 19:33:13 2008
@@ -1,11 +1,10 @@
-package org.apache.mahout.clustering.canopy;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,32 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.mahout.clustering.canopy;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
import java.io.IOException;
import java.util.Iterator;
-public class CanopyCombiner extends MapReduceBase implements Reducer {
+public class CanopyCombiner extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
Writable value = (Writable) values.next();
- Float[] center = Canopy.decodePoint(value.toString());
+ Float[] center = Point.decodePoint(value.toString());
Canopy canopy = new Canopy(center);
while (values.hasNext()) {
value = (Writable) values.next();
- Float[] point = Canopy.decodePoint(value.toString());
+ Float[] point = Point.decodePoint(value.toString());
canopy.addPoint(point);
}
- output.collect(new Text("centroid"), new Text(Canopy.formatPoint(canopy
+ output.collect(new Text("centroid"), new Text(Point.formatPoint(canopy
.computeCentroid())));
}
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Fri Feb 29 19:33:13 2008
@@ -32,7 +32,7 @@
float t1 = new Float(args[3]);
float t2 = new Float(args[4]);
String jarLocation = "apache-mahout-0.1-dev.jar";
- if (args.length > 5){
+ if (args.length > 5) {
jarLocation = args[5];
}
runJob(input, output, measureClassName, t1, t2, jarLocation);
@@ -40,18 +40,18 @@
/**
* Run the job
- *
- * @param input the input pathname String
- * @param output the output pathname String
+ *
+ * @param input the input pathname String
+ * @param output the output pathname String
* @param measureClassName the DistanceMeasure class name
- * @param t1 the T1 distance threshold
- * @param t2 the T2 distance threshold
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
*/
public static void runJob(String input, String output,
- String measureClassName, float t1, float t2, String jarLocation) {
+ String measureClassName, float t1, float t2, String jarLocation) {
JobClient client = new JobClient();
JobConf conf = new JobConf(
- org.apache.mahout.clustering.canopy.CanopyDriver.class);
+ org.apache.mahout.clustering.canopy.CanopyDriver.class);
conf.setJar(jarLocation);
conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
conf.set(Canopy.T1_KEY, "" + t1);
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Fri Feb 29 19:33:13 2008
@@ -16,19 +16,21 @@
*/
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
-public class CanopyMapper extends MapReduceBase implements Mapper {
+public class CanopyMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Text, Text, Text> {
List<Canopy> canopies = new ArrayList<Canopy>();
@@ -40,9 +42,9 @@
* org.apache.hadoop.mapred.OutputCollector,
* org.apache.hadoop.mapred.Reporter)
*/
- public void map(WritableComparable key, Writable values,
- OutputCollector output, Reporter reporter) throws IOException {
- Float[] point = Canopy.decodePoint(values.toString());
+ public void map(WritableComparable key, Text values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Float[] point = Point.decodePoint(values.toString());
Canopy.emitPointToNewCanopies(point, canopies, output);
}
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Fri Feb 29 19:33:13 2008
@@ -16,20 +16,21 @@
*/
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
-public class CanopyReducer extends MapReduceBase implements Reducer {
+public class CanopyReducer extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text> {
List<Canopy> canopies = new ArrayList<Canopy>();
@@ -40,23 +41,18 @@
* java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
* org.apache.hadoop.mapred.Reporter)
*/
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
Text value = (Text) values.next();
- Float[] point = Canopy.decodePoint(value.toString());
+ Float[] point = Point.decodePoint(value.toString());
Canopy.addPointToCanopies(point, canopies);
}
for (Canopy canopy : canopies)
- output.collect(new Text("C" + canopy.getCanopyId()), new Text(Canopy
- .formatPoint(canopy.computeCentroid())));
+ output.collect(new Text(canopy.getIdentifier()), new Text(Canopy
+ .formatCanopy(canopy)));
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
/*
* (non-Javadoc)
*
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Fri Feb 29 19:33:13 2008
@@ -33,7 +33,7 @@
float t1 = new Float(args[4]);
float t2 = new Float(args[5]);
String jarLocation = "apache-mahout-0.1-dev.jar";
- if (args.length > 6){
+ if (args.length > 6) {
jarLocation = args[6];
}
runJob(points, canopies, output, measureClassName, t1, t2, jarLocation);
@@ -41,20 +41,20 @@
/**
* Run the job
- *
- * @param points the input points directory pathname String
- * @param canopies the input canopies directory pathname String
- * @param output the output directory pathname String
+ *
+ * @param points the input points directory pathname String
+ * @param canopies the input canopies directory pathname String
+ * @param output the output directory pathname String
* @param measureClassName the DistanceMeasure class name
- * @param t1 the T1 distance threshold
- * @param t2 the T2 distance threshold
+ * @param t1 the T1 distance threshold
+ * @param t2 the T2 distance threshold
* @param jarLocation
*/
public static void runJob(String points, String canopies, String output,
String measureClassName, float t1, float t2, String jarLocation) {
JobClient client = new JobClient();
JobConf conf = new JobConf(
- org.apache.mahout.clustering.canopy.ClusterDriver.class);
+ org.apache.mahout.clustering.canopy.ClusterDriver.class);
conf.setJar(jarLocation);
conf.set(Canopy.DISTANCE_MEASURE_KEY, measureClassName);
conf.set(Canopy.T1_KEY, "" + t1);
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Fri Feb 29 19:33:13 2008
@@ -16,35 +16,36 @@
*/
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
-public class ClusterMapper extends MapReduceBase implements Mapper {
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Text, Text, Text> {
List<Canopy> canopies;
- public void map(WritableComparable key, Writable values,
- OutputCollector output, Reporter reporter) throws IOException {
- Float[] point = Canopy.decodePoint(values.toString());
+ public void map(WritableComparable key, Text values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Float[] point = Point.decodePoint(values.toString());
Canopy.emitPointToExistingCanopies(point, canopies, values, output);
}
/**
* Configure the mapper by providing its canopies. Used by unit tests.
- *
+ *
* @param canopies a List<Canopy>
*/
public void config(List<Canopy> canopies) {
@@ -72,7 +73,7 @@
Text key = new Text();
Text value = new Text();
while (reader.next(key, value)) {
- Canopy canopy = new Canopy(Canopy.decodePoint(value.toString()));
+ Canopy canopy = Canopy.decodeCanopy(value.toString());
canopies.add(canopy);
}
} finally {
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,272 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.List;
+
+public class Cluster {
+
+ public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
+ public static final String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
+ public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
+ private static int nextClusterId = 0;
+
+ // this cluster's clusterId
+ private int clusterId;
+
+ // the current center
+ private Float[] center = new Float[0];
+
+ // the current centroid is lazy evaluated and may be null
+ private Float[] centroid = null;
+
+ // the number of points in the cluster
+ private int numPoints = 0;
+
+ // the total of all points added to the cluster
+ private Float[] pointTotal = null;
+
+ // has the centroid converged with the center?
+ private boolean converged = false;
+
+ private static DistanceMeasure measure;
+
+ private static float convergenceDelta = 0;
+
+ /**
+ * Format the cluster for output
+ *
+ * @param cluster the Cluster
+ * @return
+ */
+ public static String formatCluster(Cluster cluster) {
+ return cluster.getIdentifier() + ": "
+ + Point.formatPoint(cluster.computeCentroid());
+ }
+
+ /**
+ * Decodes and returns a Cluster from the formattedString
+ *
+ * @param formattedString a String produced by formatCluster
+ * @return a new Canopy
+ */
+ public static Cluster decodeCluster(String formattedString) {
+ int beginIndex = formattedString.indexOf('[');
+ String id = formattedString.substring(0, beginIndex);
+ String center = formattedString.substring(beginIndex);
+ if (id.startsWith("C") || id.startsWith("V")) {
+ int clusterId = new Integer(formattedString.substring(1, beginIndex - 2));
+ Float[] clusterCenter = Point.decodePoint(center);
+ Cluster cluster = new Cluster(clusterCenter, clusterId);
+ cluster.converged = id.startsWith("V");
+ return cluster;
+ }
+ return null;
+ }
+
+ /**
+ * Configure the distance measure from the job
+ *
+ * @param job the JobConf for the job
+ */
+ public static void configure(JobConf job) {
+ try {
+ Class cl = Class.forName(job.get(DISTANCE_MEASURE_KEY));
+ measure = (DistanceMeasure) cl.newInstance();
+ measure.configure(job);
+ convergenceDelta = new Float(job.get(CLUSTER_CONVERGENCE_KEY));
+ nextClusterId = 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Configure the distance measure directly. Used by unit tests.
+ *
+ * @param aMeasure the DistanceMeasure
+ * @param aConvergenceDelta the float delta value used to define convergence
+ */
+ public static void config(DistanceMeasure aMeasure, float aConvergenceDelta) {
+ measure = aMeasure;
+ convergenceDelta = aConvergenceDelta;
+ nextClusterId = 0;
+ }
+
+ /**
+ * Emit the point to the nearest cluster center
+ *
+ * @param point a Float[] representing the point
+ * @param clusters a List<Cluster> to test
+ * @param values a Writable containing the input point and possible other
+ * values of interest (payload)
+ * @param output the OutputCollector to emit into
+ * @throws IOException
+ */
+ public static void emitPointToNearestCluster(Float[] point,
+ List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+ throws IOException {
+ Cluster nearestCluster = null;
+ float nearestDistance = Float.MAX_VALUE;
+ for (Cluster cluster : clusters) {
+ float distance = measure.distance(point, cluster.getCenter());
+ if (nearestCluster == null || distance < nearestDistance) {
+ nearestCluster = cluster;
+ nearestDistance = distance;
+ }
+ }
+ output.collect(new Text(formatCluster(nearestCluster)), values);
+ }
+
+ /**
+ * Compute the centroid by averaging the pointTotals
+ *
+ * @return a Float[] which is the new centroid
+ */
+ private Float[] computeCentroid() {
+ if (numPoints == 0)
+ return pointTotal;
+ else if (centroid == null) {
+ // lazy compute new centroid
+ centroid = new Float[pointTotal.length];
+ for (int i = 0; i < pointTotal.length; i++)
+ centroid[i] = new Float(pointTotal[i] / numPoints);
+ }
+ return centroid;
+ }
+
+ /**
+ * Construct a new cluster with the given point as its center
+ *
+ * @param center a Float[] center point
+ */
+ public Cluster(Float[] center) {
+ super();
+ this.clusterId = nextClusterId++;
+ this.center = center;
+ this.numPoints = 0;
+ this.pointTotal = Point.origin(center.length);
+ }
+
+ /**
+ * Construct a new cluster with the given point as its center
+ *
+ * @param center a Float[] center point
+ */
+ public Cluster(Float[] center, int clusterId) {
+ super();
+ this.clusterId = clusterId;
+ this.center = center;
+ this.numPoints = 0;
+ this.pointTotal = Point.origin(center.length);
+ }
+
+ /**
+ * Return a printable representation of this object, using the user supplied
+ * identifier
+ *
+ * @return
+ */
+ public String toString() {
+ return getIdentifier() + " - " + Point.formatPoint(center);
+ }
+
+ public String getIdentifier() {
+ if (converged)
+ return "V" + clusterId;
+ else
+ return "C" + clusterId;
+ }
+
+ /**
+ * Add the point to the cluster
+ *
+ * @param point a Float[] point to add
+ */
+ public void addPoint(Float[] point) {
+ centroid = null;
+ numPoints++;
+ if (pointTotal == null)
+ pointTotal = point.clone();
+ else
+ for (int i = 0; i < point.length; i++)
+ pointTotal[i] = new Float(point[i] + pointTotal[i]);
+ }
+
+ /**
+ * Add the point to the cluster
+ *
+ * @param count the number of points in the delta
+ * @param delta a Float[] point to add
+ */
+ public void addPoints(int count, Float[] delta) {
+ centroid = null;
+ numPoints += count;
+ if (pointTotal == null)
+ pointTotal = delta.clone();
+ else
+ for (int i = 0; i < delta.length; i++)
+ pointTotal[i] = new Float(delta[i] + pointTotal[i]);
+ }
+
+ public Float[] getCenter() {
+ return center;
+ }
+
+ public int getNumPoints() {
+ return numPoints;
+ }
+
+ /**
+ * Compute the centroid and set the center to it.
+ */
+ public void recomputeCenter() {
+ center = computeCentroid();
+ numPoints = 0;
+ pointTotal = Point.origin(center.length);
+ }
+
+ /**
+ * Return if the cluster is converged by comparing its center and centroid.
+ *
+ * @return if the cluster is converged
+ */
+ public boolean computeConvergence() {
+ Float[] centroid = computeCentroid();
+ converged = measure.distance(centroid, center) <= convergenceDelta;
+ return converged;
+ }
+
+ public Float[] getPointTotal() {
+ return pointTotal;
+ }
+
+ public boolean isConverged() {
+ return converged;
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,49 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class KMeansCombiner extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text> {
+
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Cluster cluster = Cluster.decodeCluster(key.toString());
+ while (values.hasNext()){
+ cluster.addPoint(Point.decodePoint(values.next().toString()));
+ }
+ output.collect(key, new Text(cluster.getNumPoints() + ", "
+ + Point.formatPoint(cluster.getPointTotal())));
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Cluster.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+public class KMeansDriver {
+
+ public static void main(String[] args) {
+ String input = args[0];
+ String clusters = args[1];
+ String output = args[2];
+ String measureClass = args[3];
+ String convergenceDelta = args[4];
+ String maxIterations = args[5];
+ String jarLocation = "dist/apache-mahout-0.1-dev.jar";
+ if (args.length > 6) {
+ jarLocation = args[6];
+ }
+ runJob(input, clusters, output, measureClass, convergenceDelta,
+ maxIterations, jarLocation);
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for initial & computed clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param maxIterations the maximum number of iterations
+ * @param jarLocation The location of the Mahout jar
+ */
+ public static void runJob(String input, String clustersIn, String output,
+ String measureClass, String convergenceDelta, String maxIterations, String jarLocation) {
+ int maxIter = new Integer(maxIterations);
+ try {
+ // delete the output directory
+ JobConf conf = new JobConf(KMeansDriver.class);
+ Path outPath = new Path(output);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outPath)) {
+ fs.delete(outPath);
+ }
+ fs.mkdirs(outPath);
+ // iterate until the clusters converge
+ boolean converged = false;
+ int iteration = 0;
+
+ while (!converged && iteration < maxIter) {
+ System.out.println("Iteration " + iteration);
+ // point the output to a new directory per iteration
+ String clustersOut = output + "/clusters-" + iteration;
+ converged = runIteration(input, clustersIn, clustersOut, measureClass,
+ convergenceDelta, jarLocation);
+ // now point the input to the old output directory
+ clustersIn = output + "/clusters-" + iteration;
+ iteration++;
+ }
+ // now actually cluster the points
+ System.out.println("Clustering ");
+ runClustering(input, clustersIn, output + "/points", measureClass,
+ convergenceDelta, jarLocation);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for iniput clusters
+ * @param clustersOut the directory pathname for output clusters
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param jarLocation The location of the mahout jar
+ * @return true if the iteration successfully runs
+ */
+ static boolean runIteration(String input, String clustersIn,
+ String clustersOut, String measureClass, String convergenceDelta, String jarLocation) {
+ JobClient client = new JobClient();
+ JobConf conf = new JobConf(KMeansDriver.class);
+ conf.setJar(jarLocation);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setInputPath(new Path(input));
+ Path outPath = new Path(clustersOut);
+ conf.setOutputPath(outPath);
+
+ conf.setMapperClass(KMeansMapper.class);
+ conf.setCombinerClass(KMeansCombiner.class);
+ conf.setReducerClass(KMeansReducer.class);
+ conf.setNumReduceTasks(1);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
+ conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
+ conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+
+ client.setConf(conf);
+ try {
+ JobClient.runJob(conf);
+ FileSystem fs = FileSystem.get(conf);
+ return isConverged(clustersOut + "/part-00000", conf, fs);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return true;
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for input clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param jarLocation The location of the mahout jar
+ */
+ static void runClustering(String input, String clustersIn, String output,
+ String measureClass, String convergenceDelta, String jarLocation) {
+ JobClient client = new JobClient();
+ JobConf conf = new JobConf(KMeansDriver.class);
+ conf.setJar(jarLocation);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setInputPath(new Path(input));
+ Path outPath = new Path(output);
+ conf.setOutputPath(outPath);
+
+ conf.setMapperClass(KMeansMapper.class);
+ conf.setNumReduceTasks(0);
+ conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
+ conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
+ conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+
+ client.setConf(conf);
+ try {
+ JobClient.runJob(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Return if all of the Clusters in the filePath have converged or not
+ *
+ * @param filePath the file path to the single file containing the clusters
+ * @param conf the JobConf
+ * @param fs the FileSystem
+ * @return true if all Clusters are converged
+ * @throws IOException if there was an IO error
+ */
+ static boolean isConverged(String filePath, JobConf conf, FileSystem fs)
+ throws IOException {
+ boolean converged;
+ Path outPart = new Path(filePath);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
+ Text key = new Text();
+ Text value = new Text();
+ converged = true;
+ while (reader.next(key, value)) {
+ Cluster cluster = Cluster.decodeCluster(value.toString());
+ converged = converged && cluster.isConverged();
+ }
+ return converged;
+ }
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KMeansMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Text, Text, Text> {
+
+ List<Cluster> clusters;
+
+ public void map(WritableComparable key, Text values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Float[] point = Point.decodePoint(values.toString());
+ Cluster.emitPointToNearestCluster(point, clusters, values, output);
+ }
+
+ /**
+ * Configure the mapper by providing its clusters. Used by unit tests.
+ *
+ * @param clusters a List<Cluster>
+ */
+ void config(List<Cluster> clusters) {
+ this.clusters = clusters;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Cluster.configure(job);
+
+ String clusterPath = job.get(Cluster.CLUSTER_PATH_KEY);
+ clusters = new ArrayList<Cluster>();
+
+ try {
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path(clusterPath + "/part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ try {
+ Text key = new Text();
+ Text value = new Text();
+ while (reader.next(key, value)) {
+ Cluster cluster = Cluster.decodeCluster(value.toString());
+ // add the center so the centroid will be correct on output formatting
+ cluster.addPoint(cluster.getCenter());
+ clusters.add(cluster);
+ }
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.utils.Point;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class KMeansReducer extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text> {
+
+ float delta = 0;
+
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Cluster cluster = Cluster.decodeCluster(key.toString());
+ while (values.hasNext()) {
+ String value = values.next().toString();
+ int ix = value.indexOf(',');
+ int count = new Integer(value.substring(0, ix));
+ Float[] total = Point.decodePoint(value.substring(ix + 2));
+ cluster.addPoints(count, total);
+ }
+ // force convergence calculation
+ cluster.computeConvergence();
+ output.collect(new Text(cluster.getIdentifier()), new Text(Cluster
+ .formatCluster(cluster)));
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ Cluster.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java (original)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/DistanceMeasure.java Fri Feb 29 19:33:13 2008
@@ -26,11 +26,11 @@
/**
* Returns the distance metric applied to the arguments
- *
+ *
* @param p1 a Float[] defining a multidimensional point in some feature space
* @param p2 a Float[] defining a multidimensional point in some feature space
* @return a scalar float of the distance
*/
public float distance(Float[] p1, Float[] p2);
-
+
}
Added: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java?rev=632543&view=auto
==============================================================================
--- lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java (added)
+++ lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java Fri Feb 29 19:33:13 2008
@@ -0,0 +1,116 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.utils;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+
+public class Point {
+
+ /**
+ * Format the point for input to a Mapper or Reducer
+ *
+ * @param point a Float[]
+ * @return a String
+ */
+ public static String formatPoint(Float[] point) {
+ String out = "";
+ out += "[";
+ for (int i = 0; i < point.length; i++)
+ out += point[i] + ", ";
+ out += "] ";
+ String ptOut = out;
+ return ptOut;
+ }
+
+ /**
+ * Decodes a point from its string representation.
+ *
+ * @param formattedString a comma-terminated String of the form
+ * "[v1,v2,...,vn,]"
+ * @return the Float[] defining an n-dimensional point
+ */
+ public static Float[] decodePoint(String formattedString) {
+ String[] pts = formattedString.split(",");
+ Float[] point = new Float[pts.length - 1];
+ for (int i = 0; i < point.length; i++)
+ if (pts[i].startsWith("["))
+ point[i] = new Float(pts[i].substring(1));
+ else if (!pts[i].startsWith("]"))
+ point[i] = new Float(pts[i]);
+ return point;
+ }
+
+ /**
+ * Returns a print string for the point
+ *
+ * @param out a String to append to
+ * @param pt the Float[] point
+ * @return
+ */
+ public static String ptOut(String out, Float[] pt) {
+ out += formatPoint(pt);
+ return out;
+ }
+
+ /**
+ * Return a point with length dimensions and zero values
+ *
+ * @param length
+ * @return a Float[] representing [0,0,0,...,0]
+ */
+ public static Float[] origin(int length) {
+ Float[] result = new Float[length];
+ for (int i = 0; i < length; i++)
+ result[i] = new Float(0);
+ return result;
+ }
+
+ /**
+ * Return the sum of the two points
+ *
+ * @param pt1 a Float[] point
+ * @param pt2 a Float[] point
+ * @return
+ */
+ public static Float[] sum(Float[] pt1, Float[] pt2) {
+ Float[] result = pt1.clone();
+ for (int i = 0; i < pt1.length; i++)
+ result[i] += pt2[i];
+ return result;
+ }
+
+ public static void writePointsToFile(List<Float[]> points, String fileName)
+ throws IOException {
+ writePointsToFileWithPayload(points, fileName, "");
+ }
+
+ public static void writePointsToFileWithPayload(List<Float[]> points,
+ String fileName, String payload) throws IOException {
+ BufferedWriter output = new BufferedWriter(new FileWriter(fileName));
+ for (Float[] point : points) {
+ output.write(org.apache.mahout.utils.Point.formatPoint(point));
+ output.write(payload);
+ output.write("\n");
+ }
+ output.flush();
+ output.close();
+ }
+
+}
\ No newline at end of file
Propchange: lucene/mahout/trunk/src/main/java/org/apache/mahout/utils/Point.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java?rev=632543&r1=632542&r2=632543&view=diff
==============================================================================
--- lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java (original)
+++ lucene/mahout/trunk/src/test/java/org/apache/mahout/clustering/canopy/DummyOutputCollector.java Fri Feb 29 19:33:13 2008
@@ -16,6 +16,9 @@
* limitations under the License.
*/
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -23,32 +26,28 @@
import java.util.Set;
import java.util.TreeMap;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.OutputCollector;
+public class DummyOutputCollector implements OutputCollector<Text, Text> {
-public class DummyOutputCollector implements OutputCollector {
+ Map<String, List<Text>> data = new TreeMap<String, List<Text>>();
- Map<String, List<Writable>> data = new TreeMap<String, List<Writable>>();
-
- public void collect(WritableComparable key, Writable values)
- throws IOException {
- List<Writable> points = data.get(key.toString());
+ public void collect(Text key, Text values)
+ throws IOException {
+ List<Text> points = data.get(key.toString());
if (points == null) {
- points = new ArrayList<Writable>();
+ points = new ArrayList<Text>();
data.put(key.toString(), points);
}
points.add(values);
}
- public Map<String, List<Writable>> getData() {
+ public Map<String, List<Text>> getData() {
return data;
}
- public List<Writable> getValue(String key) {
+ public List<Text> getValue(String key) {
return data.get(key);
}
-
+
public Set<String> getKeys() {
return data.keySet();
}