You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2009/06/23 20:23:20 UTC

svn commit: r787776 [2/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ core/src/main/java/org/apache/mahout/clustering/kmeans/ core/src/main/java/org/apa...

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Tue Jun 23 18:23:18 2009
@@ -17,32 +17,15 @@
 
 package org.apache.mahout.clustering.canopy;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import junit.framework.TestCase;
-
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.mahout.matrix.AbstractVector;
 import org.apache.mahout.matrix.SparseVector;
 import org.apache.mahout.matrix.Vector;
 import org.apache.mahout.utils.DistanceMeasure;
@@ -50,10 +33,18 @@
 import org.apache.mahout.utils.EuclideanDistanceMeasure;
 import org.apache.mahout.utils.ManhattanDistanceMeasure;
 import org.apache.mahout.utils.UserDefinedDistanceMeasure;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class TestCanopyCreation extends TestCase {
-  static final double[][] raw = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
-      { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
+  static final double[][] raw = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
+          {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
 
   List<Canopy> referenceManhattan;
 
@@ -67,6 +58,8 @@
 
   List<Vector> euclideanCentroids;
 
+  FileSystem fs;
+
   public TestCanopyCreation(String name) {
     super(name);
   }
@@ -84,7 +77,7 @@
 
   /**
    * Verify that the given canopies are equivalent to the referenceManhattan
-   * 
+   *
    * @param canopies
    */
   private void verifyManhattanCanopies(List<Canopy> canopies) {
@@ -93,7 +86,7 @@
 
   /**
    * Verify that the given canopies are equivalent to the referenceEuclidean
-   * 
+   *
    * @param canopies
    */
   private void verifyEuclideanCanopies(List<Canopy> canopies) {
@@ -104,29 +97,29 @@
    * Verify that the given canopies are equivalent to the reference. This means
    * the number of canopies is the same, the number of points in each is the
    * same and the centroids are the same.
-   * 
+   *
    * @param canopies
    */
   private static void verifyCanopies(List<Canopy> canopies,
-      List<Canopy> reference) {
+                                     List<Canopy> reference) {
     assertEquals("number of canopies", reference.size(), canopies.size());
     for (int canopyIx = 0; canopyIx < canopies.size(); canopyIx++) {
       Canopy refCanopy = reference.get(canopyIx);
       Canopy testCanopy = canopies.get(canopyIx);
       assertEquals("canopy points " + canopyIx, refCanopy.getNumPoints(),
-          testCanopy.getNumPoints());
+              testCanopy.getNumPoints());
       Vector refCentroid = refCanopy.computeCentroid();
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.size(); pointIx++) {
         assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid.get(pointIx), testCentroid.get(pointIx));
+                refCentroid.get(pointIx), testCentroid.get(pointIx));
       }
     }
   }
 
   /**
    * Print the canopies to the transcript
-   * 
+   *
    * @param canopies a List<Canopy>
    */
   private static void printCanopies(List<Canopy> canopies) {
@@ -135,23 +128,6 @@
     }
   }
 
-  private static void writePointsToFile(List<Vector> points, String fileName)
-      throws IOException {
-    writePointsToFileWithPayload(points, fileName, "");
-  }
-
-  private static void writePointsToFileWithPayload(List<Vector> points,
-      String fileName, String payload) throws IOException {
-    BufferedWriter output = new BufferedWriter(new OutputStreamWriter(
-        new FileOutputStream(fileName), Charset.forName("UTF-8")));
-    for (Vector point : points) {
-      output.write(point.asFormatString());
-      output.write(payload);
-      output.write('\n');
-    }
-    output.flush();
-    output.close();
-  }
 
   private static void rmr(String path) throws Exception {
     File f = new File(path);
@@ -169,19 +145,21 @@
   @Override
   protected void setUp() throws Exception {
     super.setUp();
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
     rmr("output");
     rmr("testdata");
     referenceManhattan = populateCanopies(manhattanDistanceMeasure,
-        getPoints(raw), 3.1, 2.1);
+            getPoints(raw), 3.1, 2.1);
     manhattanCentroids = populateCentroids(referenceManhattan);
     referenceEuclidean = populateCanopies(euclideanDistanceMeasure,
-        getPoints(raw), 3.1, 2.1);
+            getPoints(raw), 3.1, 2.1);
     euclideanCentroids = populateCentroids(referenceEuclidean);
   }
 
   /**
    * Iterate through the canopies, adding their centroids to a list
-   * 
+   *
    * @param canopies a List<Canopy>
    * @return the List<Vector>
    */
@@ -194,15 +172,15 @@
 
   /**
    * Iterate through the points, adding new canopies. Return the canopies.
-   * 
+   *
    * @param measure a DistanceMeasure to use
-   * @param points a list<Vector> defining the points to be clustered
-   * @param t1 the T1 distance threshold
-   * @param t2 the T2 distance threshold
+   * @param points  a list<Vector> defining the points to be clustered
+   * @param t1      the T1 distance threshold
+   * @param t2      the T2 distance threshold
    * @return the List<Canopy> created
    */
   static List<Canopy> populateCanopies(DistanceMeasure measure,
-      List<Vector> points, double t1, double t2) {
+                                       List<Vector> points, double t1, double t2) {
     List<Canopy> canopies = new ArrayList<Canopy>();
     Canopy.config(measure, t1, t2);
     /**
@@ -238,7 +216,7 @@
   /**
    * Story: User can cluster points using a ManhattanDistanceMeasure and a
    * reference implementation
-   * 
+   *
    * @throws Exception
    */
   public void testReferenceManhattan() throws Exception {
@@ -248,16 +226,16 @@
     assertEquals("number of canopies", 3, referenceManhattan.size());
     for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
       Canopy testCanopy = referenceManhattan.get(canopyIx);
-      int[] expectedNumPoints = { 4, 4, 3 };
-      double[][] expectedCentroids = { { 1.5, 1.5 }, { 4.0, 4.0 },
-          { 4.666666666666667, 4.6666666666666667 } };
+      int[] expectedNumPoints = {4, 4, 3};
+      double[][] expectedCentroids = {{1.5, 1.5}, {4.0, 4.0},
+              {4.666666666666667, 4.6666666666666667}};
       assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
-          testCanopy.getNumPoints());
+              testCanopy.getNumPoints());
       double[] refCentroid = expectedCentroids[canopyIx];
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
         assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid[pointIx], testCentroid.get(pointIx));
+                refCentroid[pointIx], testCentroid.get(pointIx));
       }
     }
   }
@@ -265,7 +243,7 @@
   /**
    * Story: User can cluster points using a EuclideanDistanceMeasure and a
    * reference implementation
-   * 
+   *
    * @throws Exception
    */
   public void testReferenceEuclidean() throws Exception {
@@ -275,16 +253,16 @@
     assertEquals("number of canopies", 3, referenceManhattan.size());
     for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
       Canopy testCanopy = referenceEuclidean.get(canopyIx);
-      int[] expectedNumPoints = { 5, 5, 3 };
-      double[][] expectedCentroids = { { 1.8, 1.8 }, { 4.2, 4.2 },
-          { 4.666666666666667, 4.666666666666667 } };
+      int[] expectedNumPoints = {5, 5, 3};
+      double[][] expectedCentroids = {{1.8, 1.8}, {4.2, 4.2},
+              {4.666666666666667, 4.666666666666667}};
       assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
-          testCanopy.getNumPoints());
+              testCanopy.getNumPoints());
       double[] refCentroid = expectedCentroids[canopyIx];
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
         assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid[pointIx], testCentroid.get(pointIx));
+                refCentroid[pointIx], testCentroid.get(pointIx));
       }
     }
   }
@@ -292,7 +270,7 @@
   /**
    * Story: User can cluster points without instantiating them all in memory at
    * once
-   * 
+   *
    * @throws Exception
    */
   public void testIterativeManhattan() throws Exception {
@@ -311,7 +289,7 @@
   /**
    * Story: User can cluster points without instantiating them all in memory at
    * once
-   * 
+   *
    * @throws Exception
    */
   public void testIterativeEuclidean() throws Exception {
@@ -331,7 +309,7 @@
    * Story: User can produce initial canopy centers using a
    * ManhattanDistanceMeasure and a CanopyMapper/Combiner which clusters input
    * points to produce an output set of canopy centroid points.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyMapperManhattan() throws Exception {
@@ -341,7 +319,7 @@
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
+      mapper.map(new Text(), point, collector, null);
     mapper.close();
     assertEquals("Number of map results", 1, collector.getData().size());
     // now verify the output
@@ -349,15 +327,15 @@
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++)
       assertEquals("Centroid error",
-          manhattanCentroids.get(i).asFormatString(), data.get(i)
-              .asFormatString());
+              manhattanCentroids.get(i).asFormatString(), data.get(i)
+                      .asFormatString());
   }
 
   /**
    * Story: User can produce initial canopy centers using a
    * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
    * points to produce an output set of canopy centroid points.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyMapperEuclidean() throws Exception {
@@ -367,7 +345,7 @@
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
+      mapper.map(new Text(), point, collector, null);
     mapper.close();
     assertEquals("Number of map results", 1, collector.getData().size());
     // now verify the output
@@ -375,20 +353,20 @@
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++)
       assertEquals("Centroid error",
-          euclideanCentroids.get(i).asFormatString(), data.get(i)
-              .asFormatString());
+              euclideanCentroids.get(i).asFormatString(), data.get(i)
+                      .asFormatString());
   }
 
   /**
    * Story: User can produce final canopy centers using a
    * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid
    * points to produce an output set of final canopy centroid points.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyReducerManhattan() throws Exception {
     CanopyReducer reducer = new CanopyReducer();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Canopy> collector = new DummyOutputCollector<Text, Canopy>();
     Canopy.config(manhattanDistanceMeasure, (3.1), (2.1));
     List<Vector> points = getPoints(raw);
     reducer.reduce(new Text("centroid"), points.iterator(), collector, null);
@@ -397,10 +375,9 @@
     assertEquals("Number of centroids", 3, keys.size());
     int i = 0;
     for (String key : keys) {
-      List<Text> data = collector.getValue(key);
-      assertEquals("Centroid error",
-          manhattanCentroids.get(i).asFormatString(), Canopy.decodeCanopy(
-              data.get(0).toString()).getCenter().asFormatString());
+      List<Canopy> data = collector.getValue(key);
+      assertTrue(manhattanCentroids.get(i).asFormatString() + " is not equal to " + data.get(0).computeCentroid().asFormatString(),
+              manhattanCentroids.get(i).equals(data.get(0).computeCentroid()));
       i++;
     }
   }
@@ -409,12 +386,12 @@
    * Story: User can produce final canopy centers using a
    * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
    * points to produce an output set of final canopy centroid points.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyReducerEuclidean() throws Exception {
     CanopyReducer reducer = new CanopyReducer();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Canopy> collector = new DummyOutputCollector<Text, Canopy>();
     Canopy.config(euclideanDistanceMeasure, (3.1), (2.1));
     List<Vector> points = getPoints(raw);
     reducer.reduce(new Text("centroid"), points.iterator(), collector, null);
@@ -423,10 +400,9 @@
     assertEquals("Number of centroids", 3, keys.size());
     int i = 0;
     for (String key : keys) {
-      List<Text> data = collector.getValue(key);
-      assertEquals("Centroid error",
-          euclideanCentroids.get(i).asFormatString(), Canopy.decodeCanopy(
-              data.get(0).toString()).getCenter().asFormatString());
+      List<Canopy> data = collector.getValue(key);
+      assertTrue(euclideanCentroids.get(i).asFormatString() + " is not equal to " + data.get(0).computeCentroid().asFormatString(),
+              euclideanCentroids.get(i).equals(data.get(0).computeCentroid()));
       i++;
     }
   }
@@ -434,7 +410,7 @@
   /**
    * Story: User can produce final canopy centers using a Hadoop map/reduce job
    * and a ManhattanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyGenManhattanMR() throws Exception {
@@ -442,38 +418,38 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/file1");
-    writePointsToFile(points, "testdata/file2");
+    JobConf job = new JobConf(
+            CanopyDriver.class);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, job);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, job);
     // now run the Canopy Driver
     CanopyDriver.runJob("testdata", "output/canopies",
-        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
+            ManhattanDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
 
     // verify output from sequence file
-    JobConf job = new JobConf(
-        org.apache.mahout.clustering.canopy.CanopyDriver.class);
     Path path = new Path("output/canopies/part-00000");
     FileSystem fs = FileSystem.get(path.toUri(), job);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
     Text key = new Text();
-    Text value = new Text();
-    assertTrue("more to come", reader.next(key, value));
+    Canopy canopy = new Canopy();
+    assertTrue("more to come", reader.next(key, canopy));
     assertEquals("1st key", "C0", key.toString());
-    Canopy canopy = Canopy.decodeCanopy(value.toString());
+    //Canopy canopy = new Canopy(value);//Canopy.decodeCanopy(value.toString());
     assertEquals("1st x value", 1.5, canopy.getCenter().get(0));
     assertEquals("1st y value", 1.5, canopy.getCenter().get(1));
-    assertTrue("more to come", reader.next(key, value));
+    assertTrue("more to come", reader.next(key, canopy));
     assertEquals("2nd key", "C1", key.toString());
-    canopy = Canopy.decodeCanopy(value.toString());
+    //canopy = Canopy.decodeCanopy(canopy.toString());
     assertEquals("1st x value", 4.333333333333334, canopy.getCenter().get(0));
     assertEquals("1st y value", 4.333333333333334, canopy.getCenter().get(1));
-    assertFalse("more to come", reader.next(key, value));
+    assertFalse("more to come", reader.next(key, canopy));
     reader.close();
   }
 
   /**
    * Story: User can produce final canopy centers using a Hadoop map/reduce job
    * and a EuclideanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testCanopyGenEuclideanMR() throws Exception {
@@ -481,30 +457,28 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/file1");
-    writePointsToFile(points, "testdata/file2");
+    JobConf job = new JobConf(
+            CanopyDriver.class);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, job);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, job);
     // now run the Canopy Driver
     CanopyDriver.runJob("testdata", "output/canopies",
-        EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
+            EuclideanDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
 
     // verify output from sequence file
-    JobConf job = new JobConf(
-        org.apache.mahout.clustering.canopy.CanopyDriver.class);
     Path path = new Path("output/canopies/part-00000");
     FileSystem fs = FileSystem.get(path.toUri(), job);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
     Text key = new Text();
-    Text value = new Text();
+    Canopy value = new Canopy();
     assertTrue("more to come", reader.next(key, value));
     assertEquals("1st key", "C0", key.toString());
-    Canopy canopy = Canopy.decodeCanopy(value.toString());
-    assertEquals("1st x value", 1.8, canopy.getCenter().get(0));
-    assertEquals("1st y value", 1.8, canopy.getCenter().get(1));
+    assertEquals("1st x value", 1.8, value.getCenter().get(0));
+    assertEquals("1st y value", 1.8, value.getCenter().get(1));
     assertTrue("more to come", reader.next(key, value));
     assertEquals("2nd key", "C1", key.toString());
-    canopy = Canopy.decodeCanopy(value.toString());
-    assertEquals("1st x value", 4.433333333333334, canopy.getCenter().get(0));
-    assertEquals("1st y value", 4.433333333333334, canopy.getCenter().get(1));
+    assertEquals("1st x value", 4.433333333333334, value.getCenter().get(0));
+    assertEquals("1st y value", 4.433333333333334, value.getCenter().get(1));
     assertFalse("more to come", reader.next(key, value));
     reader.close();
   }
@@ -512,30 +486,29 @@
   /**
    * Story: User can cluster a subset of the points using a ClusterMapper and a
    * ManhattanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusterMapperManhattan() throws Exception {
     Canopy.config(manhattanDistanceMeasure, (3.1), (2.1));
     ClusterMapper mapper = new ClusterMapper();
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Vector> collector = new DummyOutputCollector<Text, Vector>();
     for (Vector centroid : manhattanCentroids)
       canopies.add(new Canopy(centroid));
     mapper.config(canopies);
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
-    Map<String, List<Text>> data = collector.getData();
+      mapper.map(new Text(), point, collector, null);
+    Map<String, List<Vector>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
-      List<Text> pts = stringListEntry.getValue();
-      for (Writable ptDef : pts)
-        assertTrue("Point not in canopy", canopy.covers(AbstractVector
-            .decodeVector(ptDef.toString())));
+      List<Vector> pts = stringListEntry.getValue();
+      for (Vector ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(ptDef));
     }
   }
 
@@ -549,117 +522,114 @@
   /**
    * Story: User can cluster a subset of the points using a ClusterMapper and a
    * EuclideanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusterMapperEuclidean() throws Exception {
     Canopy.config(euclideanDistanceMeasure, (3.1), (2.1));
     ClusterMapper mapper = new ClusterMapper();
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Vector> collector = new DummyOutputCollector<Text, Vector>();
     for (Vector centroid : euclideanCentroids)
       canopies.add(new Canopy(centroid));
     mapper.config(canopies);
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
-    Map<String, List<Text>> data = collector.getData();
+      mapper.map(new Text(), point, collector, null);
+    Map<String, List<Vector>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
-      List<Text> pts = stringListEntry.getValue();
-      for (Writable ptDef : pts)
-        assertTrue("Point not in canopy", canopy.covers(AbstractVector
-            .decodeVector(ptDef.toString())));
+      List<Vector> pts = stringListEntry.getValue();
+      for (Vector ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(ptDef));
     }
   }
 
   /**
    * Story: User can cluster a subset of the points using a ClusterReducer and a
    * ManhattanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusterReducerManhattan() throws Exception {
     Canopy.config(manhattanDistanceMeasure, (3.1), (2.1));
     ClusterMapper mapper = new ClusterMapper();
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Vector> collector = new DummyOutputCollector<Text, Vector>();
     for (Vector centroid : manhattanCentroids)
       canopies.add(new Canopy(centroid));
     mapper.config(canopies);
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
-    Map<String, List<Text>> data = collector.getData();
+      mapper.map(new Text(), point, collector, null);
+    Map<String, List<Vector>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
 
     // reduce the data
-    Reducer<Text, Text, Text, Text> reducer = new IdentityReducer<Text, Text>();
-    collector = new DummyOutputCollector<Text, Text>();
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet())
+    Reducer<Text, Vector, Text, Vector> reducer = new IdentityReducer<Text, Vector>();
+    collector = new DummyOutputCollector<Text, Vector>();
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet())
       reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry
-          .getValue().iterator(), collector, null);
+              .getValue().iterator(), collector, null);
 
     // check the output
     data = collector.getData();
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
-      List<Text> pts = stringListEntry.getValue();
-      for (Writable ptDef : pts)
-        assertTrue("Point not in canopy", canopy.covers(AbstractVector
-            .decodeVector(ptDef.toString())));
+      List<Vector> pts = stringListEntry.getValue();
+      for (Vector ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(ptDef));
     }
   }
 
   /**
    * Story: User can cluster a subset of the points using a ClusterReducer and a
    * EuclideanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusterReducerEuclidean() throws Exception {
     Canopy.config(euclideanDistanceMeasure, (3.1), (2.1));
     ClusterMapper mapper = new ClusterMapper();
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+    DummyOutputCollector<Text, Vector> collector = new DummyOutputCollector<Text, Vector>();
     for (Vector centroid : euclideanCentroids)
       canopies.add(new Canopy(centroid));
     mapper.config(canopies);
     List<Vector> points = getPoints(raw);
     // map the data
     for (Vector point : points)
-      mapper.map(new Text(), new Text(point.asFormatString()), collector, null);
-    Map<String, List<Text>> data = collector.getData();
+      mapper.map(new Text(), point, collector, null);
+    Map<String, List<Vector>> data = collector.getData();
 
     // reduce the data
-    Reducer<Text, Text, Text, Text> reducer = new IdentityReducer<Text, Text>();
-    collector = new DummyOutputCollector<Text, Text>();
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet())
+    Reducer<Text, Vector, Text, Vector> reducer = new IdentityReducer<Text, Vector>();
+    collector = new DummyOutputCollector<Text, Vector>();
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet())
       reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry
-          .getValue().iterator(), collector, null);
+              .getValue().iterator(), collector, null);
 
     // check the output
     data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<Text>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String, List<Vector>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
-      List<Text> pts = stringListEntry.getValue();
-      for (Writable ptDef : pts)
-        assertTrue("Point not in canopy", canopy.covers(AbstractVector
-            .decodeVector(ptDef.toString())));
+      List<Vector> pts = stringListEntry.getValue();
+      for (Vector ptDef : pts)
+        assertTrue("Point not in canopy", canopy.covers(ptDef));
     }
   }
 
   /**
    * Story: User can produce final point clustering using a Hadoop map/reduce
    * job and a ManhattanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusteringManhattanMR() throws Exception {
@@ -667,18 +637,28 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/file1");
-    writePointsToFile(points, "testdata/file2");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, conf);
     // now run the Job
     CanopyClusteringJob.runJob("testdata", "output",
-        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(
+            ManhattanDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
+    //TODO: change
+    /*BufferedReader reader = new BufferedReader(new InputStreamReader(
         new FileInputStream("output/clusters/part-00000"), Charset
-            .forName("UTF-8")));
+            .forName("UTF-8")));*/
+    Path path = new Path("output/clusters/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
     int count = 0;
-    while (reader.ready()) {
+    /*while (reader.ready()) {
       System.out.println(reader.readLine());
       count++;
+    }*/
+    Text txt = new Text();
+    SparseVector vector = new SparseVector();
+    while (reader.next(txt, vector)) {
+      count++;
+      System.out.println("Txt: " + txt + " Vec: " + vector.asFormatString());
     }
     // the point [3.0,3.0] is covered by both canopies
     assertEquals("number of points", 2 + 2 * points.size(), count);
@@ -688,7 +668,7 @@
   /**
    * Story: User can produce final point clustering using a Hadoop map/reduce
    * job and a EuclideanDistanceMeasure.
-   * 
+   *
    * @throws Exception
    */
   public void testClusteringEuclideanMR() throws Exception {
@@ -696,94 +676,41 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/file1");
-    writePointsToFile(points, "testdata/file2");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, conf);
     // now run the Job
     CanopyClusteringJob.runJob("testdata", "output",
-        EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(
+            EuclideanDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
+    /*BufferedReader reader = new BufferedReader(new InputStreamReader(
         new FileInputStream("output/clusters/part-00000"), Charset
-            .forName("UTF-8")));
+            .forName("UTF-8")));*/
+    Path path = new Path("output/clusters/part-00000");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
     int count = 0;
-    while (reader.ready()) {
+    /*while (reader.ready()) {
       System.out.println(reader.readLine());
       count++;
-    }
-    // the point [3.0,3.0] is covered by both canopies
-    assertEquals("number of points", 2 + 2 * points.size(), count);
-    reader.close();
-  }
-
-  /**
-   * Story: User can produce final point clustering using a Hadoop map/reduce
-   * job and a ManhattanDistanceMeasure. Input points can have extra payload
-   * information following the point [...] and this information will be retained
-   * in the output.
-   * 
-   * @throws Exception
-   */
-  public void testClusteringManhattanMRWithPayload() throws Exception {
-    List<Vector> points = getPoints(raw);
-    File testData = new File("testdata");
-    if (!testData.exists())
-      testData.mkdir();
-    writePointsToFileWithPayload(points, "testdata/file1", "file1");
-    writePointsToFileWithPayload(points, "testdata/file2", "file2");
-    // now run the Job
-    CanopyClusteringJob.runJob("testdata", "output",
-        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(
-        new FileInputStream("output/clusters/part-00000"), Charset
-            .forName("UTF-8")));
-    int count = 0;
-    while (reader.ready()) {
-      String line = reader.readLine();
-      assertTrue("No payload", line.indexOf("file") > 0);
-      System.out.println(line);
+    }*/
+    Text txt = new Text();
+    SparseVector can = new SparseVector();
+    while (reader.next(txt, can)) {
       count++;
     }
-    // the point [3.0,3.0] is covered by both canopies
-    assertEquals("number of points", 2 + 2 * points.size(), count);
-    reader.close();
-  }
-
-  /**
-   * Story: User can produce final point clustering using a Hadoop map/reduce
-   * job and a EuclideanDistanceMeasure. Input points can have extra payload
-   * information following the point [...] and this information will be retained
-   * in the output.
-   * 
-   * @throws Exception
-   */
-  public void testClusteringEuclideanMRWithPayload() throws Exception {
-    List<Vector> points = getPoints(raw);
-    File testData = new File("testdata");
-    if (!testData.exists())
-      testData.mkdir();
-    writePointsToFileWithPayload(points, "testdata/file1", "file1");
-    writePointsToFileWithPayload(points, "testdata/file2", "file2");
-    // now run the Job
-    CanopyClusteringJob.runJob("testdata", "output",
-        EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(
-        new FileInputStream("output/clusters/part-00000"), Charset
-            .forName("UTF-8")));
-    int count = 0;
-    while (reader.ready()) {
-      String line = reader.readLine();
-      assertTrue("No payload", line.indexOf("file") > 0);
-      System.out.println(line);
+    /*while (reader.ready()) {
+      System.out.println(reader.readLine());
       count++;
-    }
+    }*/
     // the point [3.0,3.0] is covered by both canopies
     assertEquals("number of points", 2 + 2 * points.size(), count);
     reader.close();
   }
 
+
   /**
    * Story: Clustering algorithm must support arbitrary user defined distance
    * measure
-   * 
+   *
    * @throws Exception
    */
   public void testUserDefinedDistanceMeasure() throws Exception {
@@ -791,31 +718,32 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/file1");
-    writePointsToFile(points, "testdata/file2");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, conf);
     // now run the Canopy Driver. User defined measure happens to be a Manhattan
     // subclass so results are same.
     CanopyDriver.runJob("testdata", "output/canopies",
-        UserDefinedDistanceMeasure.class.getName(), 3.1, 2.1);
+            UserDefinedDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
 
     // verify output from sequence file
     JobConf job = new JobConf(
-        org.apache.mahout.clustering.canopy.CanopyDriver.class);
+            CanopyDriver.class);
     Path path = new Path("output/canopies/part-00000");
     FileSystem fs = FileSystem.get(path.toUri(), job);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
     Text key = new Text();
-    Text value = new Text();
+    Canopy value = new Canopy();
     assertTrue("more to come", reader.next(key, value));
     assertEquals("1st key", "C0", key.toString());
-    Canopy canopy = Canopy.decodeCanopy(value.toString());
-    assertEquals("1st x value", 1.5, canopy.getCenter().get(0));
-    assertEquals("1st y value", 1.5, canopy.getCenter().get(1));
+
+    assertEquals("1st x value", 1.5, value.getCenter().get(0));
+    assertEquals("1st y value", 1.5, value.getCenter().get(1));
     assertTrue("more to come", reader.next(key, value));
     assertEquals("2nd key", "C1", key.toString());
-    canopy = Canopy.decodeCanopy(value.toString());
-    assertEquals("1st x value", 4.333333333333334, canopy.getCenter().get(0));
-    assertEquals("1st y value", 4.333333333333334, canopy.getCenter().get(1));
+
+    assertEquals("1st x value", 4.333333333333334, value.getCenter().get(0));
+    assertEquals("1st y value", 4.333333333333334, value.getCenter().get(1));
     assertFalse("more to come", reader.next(key, value));
     reader.close();
   }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Tue Jun 23 18:23:18 2009
@@ -35,15 +35,21 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.clustering.ClusteringTestUtils;
 import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
 import org.apache.mahout.utils.DistanceMeasure;
 import org.apache.mahout.utils.DummyOutputCollector;
 import org.apache.mahout.utils.EuclideanDistanceMeasure;
 
 public class TestFuzzyKmeansClustering extends TestCase {
 
+  FileSystem fs;
+
   private static void rmr(String path) throws Exception {
     File f = new File(path);
     if (f.exists()) {
@@ -62,6 +68,8 @@
     super.setUp();
     rmr("output");
     rmr("testdata");
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
   }
 
   public static double round(double val, int places) {
@@ -145,8 +153,8 @@
         outputValue.append(clusterList.get(i).getClusterId()).append(':')
             .append(probWeight).append(' ');
       }
-
-      pointClusterInfo.put(point.asFormatString().trim(), outputValue
+      String name = point.getName();
+      pointClusterInfo.put(name != null && name.equals("") == false ? name : point.asFormatString().trim(), outputValue
           .toString().trim()
           + ']');
     }
@@ -202,9 +210,9 @@
     testData = new File("testdata/points");
     if (!testData.exists())
       testData.mkdir();
-
-    TestKmeansClustering.writePointsToFile(points, "testdata/points/file1");
-    TestKmeansClustering.writePointsToFile(points, "testdata/points/file2");
+    JobConf conf = new JobConf(FuzzyKMeansDriver.class);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file2", fs, conf);
 
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
@@ -217,27 +225,29 @@
       }
 
       testData = new File("testdata/clusters");
-      if (!testData.exists())
+      if (!testData.exists()) {
         testData.mkdir();
+      }
 
-      BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+      /*BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
           new FileOutputStream("testdata/clusters/part-00000"), Charset
               .forName("UTF-8")));
-
+*/
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path("testdata/clusters/part-00000"),
+              Text.class, SoftCluster.class);
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i));
 
         SoftCluster cluster = new SoftCluster(vec);
         // add the center so the centroid will be correct upon output
         cluster.addPoint(cluster.getCenter(), 1);
-        writer.write(cluster.getIdentifier() + '\t'
-            + SoftCluster.formatCluster(cluster) + '\n');
+        /*writer.write(cluster.getIdentifier() + '\t'
+            + SoftCluster.formatCluster(cluster) + '\n');*/
+        writer.append(new Text(cluster.getIdentifier()), cluster);
 
       }
-      writer.flush();
       writer.close();
 
-      JobConf conf = new JobConf(FuzzyKMeansDriver.class);
       Path outPath = new Path("output");
       fs = FileSystem.get(outPath.toUri(), conf);
       if (fs.exists(outPath)) {
@@ -247,35 +257,37 @@
       // now run the Job      
       FuzzyKMeansDriver.runJob("testdata/points", "testdata/clusters",
           "output", EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1,
-          k + 1, 2);      
+          k + 1, 2, SparseVector.class);
 
       // now compare the expected clusters with actual
       File outDir = new File("output/points");
       assertTrue("output dir exists?", outDir.exists());
       outDir.list();
 //      assertEquals("output dir files?", 4, outFiles.length);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
+      /*BufferedReader reader = new BufferedReader(new InputStreamReader(
           new FileInputStream("output/points/part-00000"), Charset
-              .forName("UTF-8")));
-
-      while (reader.ready()) {
-        String line = reader.readLine();
+              .forName("UTF-8")));*/
+      Text key = new Text();
+      FuzzyKMeansOutput out = new FuzzyKMeansOutput();
+      while (reader.next(key, out)) {
+        /*String line = reader.readLine();
         String[] lineParts = line.split("\t");
         assertEquals("line parts", 2, lineParts.length);
         String clusterInfoStr = lineParts[1].replace("[", "").replace("]", "");
 
         String[] clusterInfoList = clusterInfoStr.split(" ");
         assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+        */
         double prob = 0.0;
-        for (String clusterInfo : clusterInfoList) {
-          String[] clusterProb = clusterInfo.split(":");
-
-          double clusterProbVal = Double.parseDouble(clusterProb[1]);
-          prob += clusterProbVal;
+        double [] probabilities = out.getProbabilities();
+        for (int i = 0; i < probabilities.length; i++) {
+          //SoftCluster cluster = clusters[i];
+          prob += probabilities[i];
         }
         prob = round(prob, 1);
         assertEquals(
-            "Sum of cluster Membership problability should be equal to=", 1.0,
+            "Sum of cluster Membership probability should be equal to=", 1.0,
             prob);
       }
 
@@ -309,41 +321,35 @@
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
 
-      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+        mapper.map(new Text(), point, mapCollector,
             null);
       }
 
       // now verify mapper output
       assertEquals("Mapper Keys", k + 1, mapCollector.getData().size());
 
-      Map<String, Double> pointTotalProbMap = new HashMap<String, Double>();
+      Map<Vector, Double> pointTotalProbMap = new HashMap<Vector, Double>();
 
       for (String key : mapCollector.getKeys()) {
         // SoftCluster cluster = SoftCluster.decodeCluster(key);
-        List<Text> values = mapCollector.getValue(key);
-
-        for (Text value : values) {
-          String pointInfo = value.toString();
-          double pointProb = Double.parseDouble(pointInfo.substring(0,
-              pointInfo.indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR)));
+        List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
 
-          String encodedVector = pointInfo.substring(pointInfo
-              .indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR) + 1);
+        for (FuzzyKMeansInfo value : values) {
 
-          Double val = pointTotalProbMap.get(encodedVector);
+          Double val = pointTotalProbMap.get(value.getVector());
           double probVal = 0.0;
           if (val != null) {
             probVal = val;
           }
 
-          pointTotalProbMap.put(encodedVector, probVal + pointProb);
+          pointTotalProbMap.put(value.getVector(), probVal + value.getProbability());
         }
       }
 
-      for (Map.Entry<String, Double> entry : pointTotalProbMap.entrySet()) {
-        String key = entry.getKey();
+      for (Map.Entry<Vector, Double> entry : pointTotalProbMap.entrySet()) {
+        Vector key = entry.getKey();
         double value = round(entry.getValue(), 1);
 
         assertEquals("total Prob for Point:" + key, 1.0, value);
@@ -375,19 +381,19 @@
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
 
-      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+        mapper.map(new Text(), point, mapCollector,
             null);
       }
 
       // run combiner
-      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
 
       for (String key : mapCollector.getKeys()) {
 
-        List<Text> values = mapCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector,
             null);
       }
@@ -396,7 +402,7 @@
       assertEquals("Combiner Output", k + 1, combinerCollector.getData().size());
 
       for (String key : combinerCollector.getKeys()) {
-        List<Text> values = combinerCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         assertEquals("too many values", 1, values.size());
       }
     }
@@ -426,30 +432,30 @@
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
 
-      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+        mapper.map(new Text(), point, mapCollector,
             null);
       }
 
       // run combiner
-      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
 
       for (String key : mapCollector.getKeys()) {
 
-        List<Text> values = mapCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector,
             null);
       }
 
       // run reducer
-      DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, SoftCluster> reducerCollector = new DummyOutputCollector<Text, SoftCluster>();
       FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
       reducer.config(clusterList);
 
       for (String key : combinerCollector.getKeys()) {
-        List<Text> values = combinerCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         reducer
             .reduce(new Text(key), values.iterator(), reducerCollector, null);
       }
@@ -466,15 +472,13 @@
       iterateReference(points, reference, measure);
       for (SoftCluster key : reference) {
         String clusterId = key.getIdentifier();
-        List<Text> values = reducerCollector.getValue(clusterId);
-        SoftCluster cluster = SoftCluster.decodeCluster(values.get(0)
-            .toString());
+        List<SoftCluster> values = reducerCollector.getValue(clusterId);
+        SoftCluster cluster = values.get(0);
         System.out.println("ref= " + key.toString() + " cluster= "
             + cluster.toString());
-        assertEquals(k + " center[" + key + "][0]", key.getCenter().get(0),
-            cluster.getCenter().get(0));
-        assertEquals(k + " center[" + key + "][1]", key.getCenter().get(1),
-            cluster.getCenter().get(1));
+        cluster.recomputeCenter();
+        assertTrue("key center: " + key.getCenter().asFormatString() + " does not equal cluster: " +
+                cluster.getCenter().asFormatString(), key.getCenter().equals(cluster.getCenter()));
       }
     }
   }
@@ -503,30 +507,32 @@
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
 
-      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+        mapper.map(new Text(), point, mapCollector,
             null);
       }
-
+      for (SoftCluster softCluster : clusterList) {
+        softCluster.recomputeCenter();
+      }
       // run combiner
-      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
-
+      //combiner.configure();
       for (String key : mapCollector.getKeys()) {
 
-        List<Text> values = mapCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector,
             null);
       }
 
       // run reducer
-      DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, SoftCluster> reducerCollector = new DummyOutputCollector<Text, SoftCluster>();
       FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
       reducer.config(clusterList);
 
       for (String key : combinerCollector.getKeys()) {
-        List<Text> values = combinerCollector.getValue(key);
+        List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         reducer
             .reduce(new Text(key), values.iterator(), reducerCollector, null);
       }
@@ -535,15 +541,18 @@
       List<SoftCluster> reducerCluster = new ArrayList<SoftCluster>();
 
       for (String key : reducerCollector.getKeys()) {
-        List<Text> values = reducerCollector.getValue(key);
-        reducerCluster.add(SoftCluster.decodeCluster(values.get(0).toString()));
+        List<SoftCluster> values = reducerCollector.getValue(key);
+        reducerCluster.add(values.get(0));
+      }
+      for (SoftCluster softCluster : reducerCluster) {
+        softCluster.recomputeCenter();
       }
 
-      DummyOutputCollector<Text, Text> clusterMapperCollector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, FuzzyKMeansOutput> clusterMapperCollector = new DummyOutputCollector<Text, FuzzyKMeansOutput>();
       FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
       clusterMapper.config(reducerCluster);
       for (Vector point : points) {
-        clusterMapper.map(new Text(), new Text(point.asFormatString()),
+        clusterMapper.map(new Text(), point,
             clusterMapperCollector, null);
       }
 
@@ -561,7 +570,7 @@
 
       // Now compare the clustermapper results with reducer
       for (String key : clusterMapperCollector.getKeys()) {
-        List<Text> value = clusterMapperCollector.getValue(key);
+        List<FuzzyKMeansOutput> value = clusterMapperCollector.getValue(key);
 
         String refValue = pointClusterInfo.get(key);
         String clusterInfoStr = refValue.substring(1, refValue.length() - 1);
@@ -574,14 +583,21 @@
           refClusterInfoMap.put(clusterProb[0], clusterProbVal);
         }
 
-        String[] clusterInfoList = value.get(0).toString().replace("[", "")
-            .replace("]", "").split(" ");
-        assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+        FuzzyKMeansOutput kMeansOutput = value.get(0);
+        SoftCluster[] softClusters = kMeansOutput.getClusters();
+        double [] probabilities = kMeansOutput.getProbabilities();
+        assertEquals("Number of clusters", k + 1, softClusters.length);
         for (String clusterInfo : refClusterInfoList) {
           String[] clusterProb = clusterInfo.split(":");
           double clusterProbVal = Double.parseDouble(clusterProb[1]);
-          assertEquals(k + " point:" + key + ": Cluster:" + clusterProb[0],
-              refClusterInfoMap.get(clusterProb[0]), clusterProbVal);
+          System.out.println(k + " point:" + key + ": Cluster: " + clusterProb[0] + " prob: " + clusterProbVal);
+          /*assertEquals(,
+                  refClusterInfoMap.get(clusterProb[0]), clusterProbVal);*/
+        }
+        for (int i = 0; i < softClusters.length; i++) {
+          SoftCluster softCluster = softClusters[i];
+          Double refProb = refClusterInfoMap.get(String.valueOf(softCluster.getClusterId()));
+          assertEquals(k + " point: " + key + ": expected probability: " + refProb + " was: " + probabilities[i], refProb, probabilities[i]);
         }
       }
     }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Tue Jun 23 18:23:18 2009
@@ -23,10 +23,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.mahout.clustering.canopy.CanopyDriver;
-import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.clustering.ClusteringTestUtils;
 import org.apache.mahout.matrix.DenseVector;
 import org.apache.mahout.matrix.SparseVector;
 import org.apache.mahout.matrix.Vector;
@@ -36,11 +36,7 @@
 import org.apache.mahout.utils.ManhattanDistanceMeasure;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.IOException;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
 import java.io.InputStreamReader;
 import java.io.FileInputStream;
 import java.util.ArrayList;
@@ -59,6 +55,8 @@
       { 1, 1, 1, 1, 1, 2, 2 }, { 1, 1, 1, 1, 1, 1, 2, 1 },
       { 1, 1, 1, 1, 1, 1, 1, 1, 1 } };
 
+  FileSystem fs;
+
   private void rmr(String path) throws Exception {
     File f = new File(path);
     if (f.exists()) {
@@ -75,6 +73,8 @@
     super.setUp();
     rmr("output");
     rmr("testdata");
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
   }
 
   /**
@@ -138,7 +138,7 @@
     List<Vector> points = new ArrayList<Vector>();
     for (int i = 0; i < raw.length; i++) {
       double[] fr = raw[i];
-      Vector vec = new SparseVector(fr.length);
+      Vector vec = new SparseVector(String.valueOf(i), fr.length);
       vec.assign(fr);
       points.add(vec);
     }
@@ -196,7 +196,7 @@
     List<Vector> points = getPoints(reference);
     for (int k = 0; k < points.size(); k++) {
       // pick k initial cluster centers at random
-      DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, KMeansInfo> collector = new DummyOutputCollector<Text, KMeansInfo>();
       List<Cluster> clusters = new ArrayList<Cluster>();
 
       for (int i = 0; i < k + 1; i++) {
@@ -210,23 +210,20 @@
       mapper.config(clusters);
       // map the data
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), collector,
+        mapper.map(new Text(), point, collector,
             null);
       }
       assertEquals("Number of map results", k + 1, collector.getData().size());
       // now verify that all points are correctly allocated
       for (String key : collector.getKeys()) {
         Cluster cluster = clusterMap.get(key);
-        List<Text> values = collector.getValue(key);
-        for (Writable value : values) {
-          String[] pointInfo = value.toString().split("\t");
-
-          Vector point = AbstractVector.decodeVector(pointInfo[1]);
+        List<KMeansInfo> values = collector.getValue(key);
+        for (KMeansInfo value : values) {
           double distance = euclideanDistanceMeasure.distance(cluster
-              .getCenter(), point);
+              .getCenter(), value.getPointTotal());
           for (Cluster c : clusters)
             assertTrue("distance error", distance <= euclideanDistanceMeasure
-                .distance(point, c.getCenter()));
+                .distance(value.getPointTotal(), c.getCenter()));
         }
       }
     }
@@ -245,7 +242,7 @@
     List<Vector> points = getPoints(reference);
     for (int k = 0; k < points.size(); k++) {
       // pick k initial cluster centers at random
-      DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, KMeansInfo> collector = new DummyOutputCollector<Text, KMeansInfo>();
       List<Cluster> clusters = new ArrayList<Cluster>();
       for (int i = 0; i < k + 1; i++) {
         Vector vec = points.get(i);
@@ -258,12 +255,12 @@
       mapper.config(clusters);
       // map the data
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), collector,
+        mapper.map(new Text(), point, collector,
             null);
       }
       // now combine the data
       KMeansCombiner combiner = new KMeansCombiner();
-      DummyOutputCollector<Text, Text> collector2 = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, KMeansInfo> collector2 = new DummyOutputCollector<Text, KMeansInfo>();
       for (String key : collector.getKeys())
         combiner.reduce(new Text(key), collector.getValue(key).iterator(),
             collector2, null);
@@ -273,13 +270,13 @@
       int count = 0;
       Vector total = new DenseVector(2);
       for (String key : collector2.getKeys()) {
-        List<Text> values = collector2.getValue(key);
+        List<KMeansInfo> values = collector2.getValue(key);
         assertEquals("too many values", 1, values.size());
-        String value = values.get(0).toString();
+        //String value = values.get(0).toString();
+        KMeansInfo info = values.get(0);
 
-        String[] pointInfo = value.split("\t");
-        count += Integer.parseInt(pointInfo[0]);
-        total = total.plus(AbstractVector.decodeVector(pointInfo[1]));
+        count += info.getPoints();
+        total = total.plus(info.getPointTotal());
       }
       assertEquals("total points", 9, count);
       assertEquals("point total[0]", 27, (int) total.get(0));
@@ -301,7 +298,7 @@
     for (int k = 0; k < points.size(); k++) {
       System.out.println("K = " + k);
       // pick k initial cluster centers at random
-      DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, KMeansInfo> collector = new DummyOutputCollector<Text, KMeansInfo>();
       List<Cluster> clusters = new ArrayList<Cluster>();
       for (int i = 0; i < k + 1; i++) {
         Vector vec = points.get(i);
@@ -313,12 +310,12 @@
       mapper.config(clusters);
       // map the data
       for (Vector point : points) {
-        mapper.map(new Text(), new Text(point.asFormatString()), collector,
+        mapper.map(new Text(), point, collector,
             null);
       }
       // now combine the data
       KMeansCombiner combiner = new KMeansCombiner();
-      DummyOutputCollector<Text, Text> collector2 = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, KMeansInfo> collector2 = new DummyOutputCollector<Text, KMeansInfo>();
       for (String key : collector.getKeys())
         combiner.reduce(new Text(key), collector.getValue(key).iterator(),
             collector2, null);
@@ -326,7 +323,7 @@
       // now reduce the data
       KMeansReducer reducer = new KMeansReducer();
       reducer.config(clusters);
-      DummyOutputCollector<Text, Text> collector3 = new DummyOutputCollector<Text, Text>();
+      DummyOutputCollector<Text, Cluster> collector3 = new DummyOutputCollector<Text, Cluster>();
       for (String key : collector2.getKeys())
         reducer.reduce(new Text(key), collector2.getValue(key).iterator(),
             collector3, null);
@@ -351,16 +348,16 @@
       for (int i = 0; i < reference.size(); i++) {
         Cluster ref = reference.get(i);
         String key = ref.getIdentifier();
-        List<Text> values = collector3.getValue(key);
-        String value = values.get(0).toString();
-        Cluster cluster = Cluster.decodeCluster(value);
+        List<Cluster> values = collector3.getValue(key);
+        Cluster cluster = values.get(0);
         converged = converged && cluster.isConverged();
-        System.out.println("ref= " + ref.toString() + " cluster= "
-            + cluster.toString());
-        assertEquals(k + " center[" + key + "][0]", ref.getCenter().get(0),
-            cluster.getCenter().get(0));
-        assertEquals(k + " center[" + key + "][1]", ref.getCenter().get(1),
-            cluster.getCenter().get(1));
+        //Since we aren't roundtripping through Writable, we need to compare the reference center with the cluster centroid
+        cluster.recomputeCenter();
+        assertTrue(i + " reference center: " + ref.getCenter().asFormatString() + " and cluster center:  "
+                + cluster.getCenter().asFormatString() + " are not equal", ref.getCenter().equals(cluster.getCenter()));
+
+        /*assertEquals(k + " center[" + key + "][1]", ref.getCenter().get(1),
+            cluster.getCenter().get(1));*/
       }
       if (k == 8)
         assertTrue("not converged? " + k, converged);
@@ -383,16 +380,17 @@
     if (!testData.exists())
       testData.mkdir();
 
-    writePointsToFile(points, "testdata/points/file1");
-    writePointsToFile(points, "testdata/points/file2");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file2", fs, conf);
     for (int k = 1; k < points.size(); k++) {
       System.out.println("testKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
       JobConf job = new JobConf(KMeansDriver.class);
       Path path = new Path("testdata/clusters/part-00000");
       FileSystem fs = FileSystem.get(path.toUri(), job);
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path,
-          Text.class, Text.class);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path,
+          Text.class, Cluster.class);
 
       for (int i = 0; i < k + 1; i++) {
         Vector vec = points.get(i);
@@ -400,28 +398,33 @@
         Cluster cluster = new Cluster(vec, i);
         // add the center so the centroid will be correct upon output
         cluster.addPoint(cluster.getCenter());
-        writer.append(new Text(cluster.getIdentifier()), new Text(Cluster
-            .formatCluster(cluster)));
+        writer.append(new Text(cluster.getIdentifier()), cluster);
       }
       writer.close();
       // now run the Job
       KMeansJob.runJob("testdata/points", "testdata/clusters", "output",
-          EuclideanDistanceMeasure.class.getName(), 0.001, 10, k + 1);
+          EuclideanDistanceMeasure.class.getName(), 0.001, 10, k + 1, SparseVector.class);
       // now compare the expected clusters with actual
       File outDir = new File("output/points");
       assertTrue("output dir exists?", outDir.exists());
       // assertEquals("output dir files?", 4, outFiles.length);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(
-          new FileInputStream("output/points/part-00000"), Charset
-              .forName("UTF-8")));
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
       int[] expect = expectedNumPoints[k];
       DummyOutputCollector<Text, Text> collector = new DummyOutputCollector<Text, Text>();
-      while (reader.ready()) {
-        String line = reader.readLine();
+      //The key is the name of the vector, or the vector itself
+      Text key = new Text();
+      //The value is the cluster id
+      Text value = new Text();
+      while (reader.next(key, value)) {
+        /*String line = reader.readLine();
         String[] lineParts = line.split("\t");
-        assertEquals("line parts", 2, lineParts.length);
+        assertEquals("line parts", 2, lineParts.length);*/
         // String cl = line.substring(0, line.indexOf(':'));
-        collector.collect(new Text(lineParts[1]), new Text(lineParts[0]));
+        //collector.collect(new Text(lineParts[1]), new Text(lineParts[0]));
+        collector.collect(value, key);
+        key = new Text();
+        value = new Text();
+
       }
       reader.close();
       if (k == 2)
@@ -448,16 +451,17 @@
     testData = new File("testdata/points");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFile(points, "testdata/points/file1");
-    writePointsToFile(points, "testdata/points/file2");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file1", fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file2", fs, conf);
 
     // now run the Canopy job
     CanopyDriver.runJob("testdata/points", "testdata/canopies",
-        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
+        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1, SparseVector.class);
 
     // now run the KMeans job
     KMeansJob.runJob("testdata/points", "testdata/canopies", "output",
-        EuclideanDistanceMeasure.class.getName(), 0.001, 10, 1);
+        EuclideanDistanceMeasure.class.getName(), 0.001, 10, 1, SparseVector.class);
 
     // now compare the expected clusters with actual
     File outDir = new File("output/points");
@@ -480,24 +484,5 @@
     assertEquals("num points[V1]", 5, collector.getValue("V1").size());
   }
 
-  public static void writePointsToFileWithPayload(List<Vector> points,
-      String fileName, String payload) throws IOException {
-    BufferedWriter output = new BufferedWriter(new OutputStreamWriter(
-        new FileOutputStream(fileName), Charset.forName("UTF-8")));
-    for (Vector point : points) {
-      output.write(point.asFormatString());
-      output.write(payload);
-      output.write('\n');
-    }
-    output.flush();
-    output.close();
-  }
 
-  /**
-   * Split pattern for <code>decodePoint(String)</code>
-   */
-  public static void writePointsToFile(List<Vector> points, String fileName)
-      throws IOException {
-    writePointsToFileWithPayload(points, fileName, "");
-  }
 }

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java Tue Jun 23 18:23:18 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.mahout.matrix.Vector;
 
 import java.io.IOException;
@@ -32,17 +33,21 @@
   private InputDriver() {
   }
 
-  public static void main(String[] args) throws IOException {
-    runJob(args[0], args[1]);
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
+    String input = args[0];
+    String output = args[1];
+    String vectorClassName = args[2];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+    runJob(input, output, vectorClass);
   }
 
-  public static void runJob(String input, String output) throws IOException {
+  public static void runJob(String input, String output, Class<? extends Vector> vectorClass) throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(InputDriver.class);
 
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Vector.class);
-
+    conf.setOutputValueClass(vectorClass);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
     FileInputFormat.setInputPaths(conf, new Path(input));
     FileOutputFormat.setOutputPath(conf, new Path(output));
 

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java Tue Jun 23 18:23:18 2009
@@ -23,19 +23,24 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.matrix.DenseVector;
 import org.apache.mahout.matrix.Vector;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 
 public class InputMapper extends MapReduceBase implements
-    Mapper<LongWritable, Text, Text, Text> {
+    Mapper<LongWritable, Text, Text, Vector> {
+  protected Class<? extends Vector> outputClass;
+  protected Constructor constructor;
 
   @Override
   public void map(LongWritable key, Text values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+      OutputCollector<Text, Vector> output, Reporter reporter) throws IOException {
     String[] numbers = values.toString().split(" ");
     // sometimes there are multiple separator spaces
     List<Double> doubles = new ArrayList<Double>();
@@ -43,11 +48,32 @@
       if (value.length() > 0)
         doubles.add(Double.valueOf(value));
     }
-    Vector result = new DenseVector(doubles.size());
-    int index = 0;
-    for (Double d : doubles)
-      result.set(index++, d);
-    output.collect(null, new Text(result.asFormatString()));
+    Vector result = null;//new DenseVector(doubles.size());
+    try {
+      result = (Vector) constructor.newInstance(doubles.size());
+      int index = 0;
+      for (Double d : doubles)
+        result.set(index++, d);
+      output.collect(new Text(String.valueOf(index)), result);
+
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
   }
 
+
+  @Override
+  public void configure(JobConf job) {
+    outputClass = (Class<? extends Vector>) job.getOutputValueClass();
+    try {
+      constructor = outputClass.getConstructor(int.class);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
 }

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java Tue Jun 23 18:23:18 2009
@@ -26,22 +26,26 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.clustering.canopy.CanopyClusteringJob;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
 
 public class Job {
   private Job() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
     if (args.length == 5) {
       String input = args[0];
       String output = args[1];
       String measureClassName = args[2];
       double t1 = Double.parseDouble(args[3]);
       double t2 = Double.parseDouble(args[4]);
-      runJob(input, output, measureClassName, t1, t2);
+      String vectorClassName = args[5];
+      Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+      runJob(input, output, measureClassName, t1, t2, vectorClass);
     } else
       runJob("testdata", "output",
-          "org.apache.mahout.utils.EuclideanDistanceMeasure", 80, 55);
+          "org.apache.mahout.utils.EuclideanDistanceMeasure", 80, 55, SparseVector.class);
   }
 
   /**
@@ -62,7 +66,7 @@
    * @param t2 the canopy T2 threshold
    */
   private static void runJob(String input, String output,
-      String measureClassName, double t1, double t2) throws IOException {
+      String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(Job.class);
 
@@ -72,9 +76,9 @@
     if (dfs.exists(outPath))
       dfs.delete(outPath, true);
     final String directoryContainingConvertedInput = output + DIRECTORY_CONTAINING_CONVERTED_INPUT;
-    InputDriver.runJob(input, directoryContainingConvertedInput);
+    InputDriver.runJob(input, directoryContainingConvertedInput, vectorClass);
     CanopyClusteringJob.runJob(directoryContainingConvertedInput, output, measureClassName,
-        t1, t2);
+        t1, t2, vectorClass);
   }
 
 }

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java Tue Jun 23 18:23:18 2009
@@ -32,6 +32,7 @@
 import org.apache.mahout.clustering.kmeans.KMeansDriver;
 import org.apache.mahout.clustering.syntheticcontrol.canopy.InputDriver;
 import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
 
 import static org.apache.mahout.clustering.syntheticcontrol.Constants.DIRECTORY_CONTAINING_CONVERTED_INPUT;
 
@@ -51,14 +52,16 @@
       int maxIterations = Integer.parseInt(args[4]);
       double alpha_0 = Double.parseDouble(args[5]);
       int numReducers = Integer.parseInt(args[6]);
+      String vectorClassName = args[7];
+      Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
       runJob(input, output, modelFactory, numClusters, maxIterations, alpha_0,
-          numReducers);
+          numReducers, vectorClass);
     } else
       runJob(
           "testdata",
           "output",
           "org.apache.mahout.clustering.syntheticcontrol.dirichlet.NormalScModelDistribution",
-          10, 5, 1.0, 1);
+          10, 5, 1.0, 1, SparseVector.class);
   }
 
   /**
@@ -77,7 +80,7 @@
    * @throws ClassNotFoundException 
    */
   public static void runJob(String input, String output, String modelFactory,
-      int numModels, int maxIterations, double alpha_0, int numReducers)
+      int numModels, int maxIterations, double alpha_0, int numReducers, Class<? extends Vector> vectorClass)
       throws IOException, ClassNotFoundException, InstantiationException,
       IllegalAccessException {
     // delete the output directory
@@ -89,7 +92,7 @@
     }
     fs.mkdirs(outPath);
     final String directoryContainingConvertedInput = output + DIRECTORY_CONTAINING_CONVERTED_INPUT;
-    InputDriver.runJob(input, directoryContainingConvertedInput);
+    InputDriver.runJob(input, directoryContainingConvertedInput, vectorClass);
     DirichletDriver.runJob(directoryContainingConvertedInput, output + "/state", modelFactory,
         numModels, maxIterations, alpha_0, numReducers);
     printResults(output + "/state", modelFactory, maxIterations, numModels,

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java Tue Jun 23 18:23:18 2009
@@ -26,8 +26,9 @@
 import org.apache.mahout.clustering.canopy.CanopyClusteringJob;
 import org.apache.mahout.clustering.kmeans.KMeansDriver;
 import org.apache.mahout.clustering.syntheticcontrol.canopy.InputDriver;
-import static org.apache.mahout.clustering.syntheticcontrol.Constants.CLUSTERED_POINTS_OUTPUT_DIRECTORY;
 import static org.apache.mahout.clustering.syntheticcontrol.Constants.DIRECTORY_CONTAINING_CONVERTED_INPUT;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
 
 public class Job {
   
@@ -35,7 +36,7 @@
 private Job() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
     if (args.length == 7) {
       String input = args[0];
       String output = args[1];
@@ -44,11 +45,13 @@
       double t2 = Double.parseDouble(args[4]);
       double convergenceDelta = Double.parseDouble(args[5]);
       int maxIterations = Integer.parseInt(args[6]);
+      String vectorClassName = args[7];
+      Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
       runJob(input, output, measureClass, t1, t2, convergenceDelta,
-          maxIterations);
+          maxIterations, vectorClass);
     } else
       runJob("testdata", "output",
-          "org.apache.mahout.utils.EuclideanDistanceMeasure", 80, 55, 0.5, 10);
+          "org.apache.mahout.utils.EuclideanDistanceMeasure", 80, 55, 0.5, 10, SparseVector.class);
   }
 
   /**
@@ -71,7 +74,7 @@
    * @param maxIterations the int maximum number of iterations
    */
   private static void runJob(String input, String output, String measureClass,
-      double t1, double t2, double convergenceDelta, int maxIterations)
+      double t1, double t2, double convergenceDelta, int maxIterations, Class<? extends Vector> vectorClass)
       throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(Job.class);
@@ -82,11 +85,14 @@
     if (dfs.exists(outPath))
       dfs.delete(outPath, true);
     final String directoryContainingConvertedInput = output + DIRECTORY_CONTAINING_CONVERTED_INPUT;
-    InputDriver.runJob(input, directoryContainingConvertedInput);
-    CanopyClusteringJob.runJob(directoryContainingConvertedInput, output, measureClass, t1, t2);
+    System.out.println("Preparing Input");
+    InputDriver.runJob(input, directoryContainingConvertedInput, vectorClass);
+    System.out.println("Running Canopy to get initial clusters");
+    CanopyClusteringJob.runJob(directoryContainingConvertedInput, output, measureClass, t1, t2, vectorClass);
+    System.out.println("Running KMeans");
     KMeansDriver.runJob(directoryContainingConvertedInput, 
             output + CanopyClusteringJob.DEFAULT_CANOPIES_OUTPUT_DIRECTORY, output,
-        measureClass, convergenceDelta, maxIterations, 1);
+        measureClass, convergenceDelta, maxIterations, 1, vectorClass);
     //    OutputDriver.runJob(output + KMeansDriver.DEFAULT_OUTPUT_DIRECTORY, output + CLUSTERED_POINTS_OUTPUT_DIRECTORY);
   }
 }