You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2010/03/01 06:42:36 UTC

svn commit: r917396 [2/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/dirichlet/ core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ core/src/main/java/org/...

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=917396&r1=917395&r2=917396&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Mon Mar  1 05:42:35 2010
@@ -17,6 +17,12 @@
 
 package org.apache.mahout.clustering.canopy;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,44 +32,37 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.DummyOutputCollector;
 import org.apache.mahout.common.DummyReporter;
 import org.apache.mahout.common.MahoutTestCase;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
 import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.common.DummyOutputCollector;
 import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
 import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
 import org.apache.mahout.common.distance.UserDefinedDistanceMeasure;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 public class TestCanopyCreation extends MahoutTestCase {
-
-  private static final double[][] raw = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
-      {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
-
+  
+  private static final double[][] raw = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5},
+                                         {5, 5}};
+  
   private List<Canopy> referenceManhattan;
-
+  
   private final DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
-
+  
   private List<Vector> manhattanCentroids;
-
+  
   private List<Canopy> referenceEuclidean;
-
+  
   private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
-
+  
   private List<Vector> euclideanCentroids;
-
+  
   private FileSystem fs;
-
-  private static List<VectorWritable> getPoints(double[][] raw) {
+  
+  private static List<VectorWritable> getPointsWritable(double[][] raw) {
     List<VectorWritable> points = new ArrayList<VectorWritable>();
     int i = 0;
     for (double[] fr : raw) {
@@ -73,50 +72,59 @@
     }
     return points;
   }
-
+  
+  private static List<Vector> getPoints(double[][] raw) {
+    List<Vector> points = new ArrayList<Vector>();
+    int i = 0;
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(String.valueOf(i++), fr.length);
+      vec.assign(fr);
+      points.add(vec);
+    }
+    return points;
+  }
+  
   /** Verify that the given canopies are equivalent to the referenceManhattan */
   private void verifyManhattanCanopies(List<Canopy> canopies) {
     verifyCanopies(canopies, referenceManhattan);
   }
-
+  
   /** Verify that the given canopies are equivalent to the referenceEuclidean */
   private void verifyEuclideanCanopies(List<Canopy> canopies) {
     verifyCanopies(canopies, referenceEuclidean);
   }
-
+  
   /**
-   * Verify that the given canopies are equivalent to the reference. This means the number of canopies is the same, the
-   * number of points in each is the same and the centroids are the same.
+   * Verify that the given canopies are equivalent to the reference. This means the number of canopies is the
+   * same, the number of points in each is the same and the centroids are the same.
    */
-  private static void verifyCanopies(List<Canopy> canopies,
-                                     List<Canopy> reference) {
+  private static void verifyCanopies(List<Canopy> canopies, List<Canopy> reference) {
     assertEquals("number of canopies", reference.size(), canopies.size());
     for (int canopyIx = 0; canopyIx < canopies.size(); canopyIx++) {
       Canopy refCanopy = reference.get(canopyIx);
       Canopy testCanopy = canopies.get(canopyIx);
-      assertEquals("canopy points " + canopyIx, refCanopy.getNumPoints(),
-          testCanopy.getNumPoints());
+      assertEquals("canopy points " + canopyIx, refCanopy.getNumPoints(), testCanopy.getNumPoints());
       Vector refCentroid = refCanopy.computeCentroid();
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.size(); pointIx++) {
-        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid.get(pointIx), testCentroid.get(pointIx));
+        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']', refCentroid.get(pointIx),
+          testCentroid.get(pointIx));
       }
     }
   }
-
+  
   /**
    * Print the canopies to the transcript
-   *
-   * @param canopies a List<Canopy>
+   * 
+   * @param canopies
+   *          a List<Canopy>
    */
   private static void printCanopies(List<Canopy> canopies) {
     for (Canopy canopy : canopies) {
       System.out.println(canopy.toString());
     }
   }
-
-
+  
   public static void rmr(String path) {
     File f = new File(path);
     if (f.exists()) {
@@ -129,7 +137,7 @@
       f.delete();
     }
   }
-
+  
   @Override
   protected void setUp() throws Exception {
     super.setUp();
@@ -137,73 +145,12 @@
     fs = FileSystem.get(conf);
     rmr("output");
     rmr("testdata");
-    referenceManhattan = populateCanopies(manhattanDistanceMeasure,
-        getPoints(raw), 3.1, 2.1);
-    manhattanCentroids = populateCentroids(referenceManhattan);
-    referenceEuclidean = populateCanopies(euclideanDistanceMeasure,
-        getPoints(raw), 3.1, 2.1);
-    euclideanCentroids = populateCentroids(referenceEuclidean);
-  }
-
-  /**
-   * Iterate through the canopies, adding their centroids to a list
-   *
-   * @param canopies a List<Canopy>
-   * @return the List<Vector>
-   */
-  private static List<Vector> populateCentroids(List<Canopy> canopies) {
-    List<Vector> result = new ArrayList<Vector>();
-    for (Canopy canopy : canopies) {
-      result.add(canopy.computeCentroid());
-    }
-    return result;
+    referenceManhattan = CanopyClusterer.createCanopies(getPoints(raw), manhattanDistanceMeasure, 3.1, 2.1);
+    manhattanCentroids = CanopyClusterer.calculateCentroids(referenceManhattan);
+    referenceEuclidean = CanopyClusterer.createCanopies(getPoints(raw), euclideanDistanceMeasure, 3.1, 2.1);
+    euclideanCentroids = CanopyClusterer.calculateCentroids(referenceEuclidean);
   }
-
-  /**
-   * Iterate through the points, adding new canopies. Return the canopies.
-   *
-   * @param measure a DistanceMeasure to use
-   * @param points  a list<Vector> defining the points to be clustered
-   * @param t1      the T1 distance threshold
-   * @param t2      the T2 distance threshold
-   * @return the List<Canopy> created
-   */
-  private static List<Canopy> populateCanopies(DistanceMeasure measure,
-                                       List<VectorWritable> points, double t1, double t2) {
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    /**
-     * Reference Implementation: Given a distance metric, one can create
-     * canopies as follows: Start with a list of the data points in any order,
-     * and with two distance thresholds, T1 and T2, where T1 > T2. (These
-     * thresholds can be set by the user, or selected by cross-validation.) Pick
-     * a point on the list and measure its distance to all other points. Put all
-     * points that are within distance threshold T1 into a canopy. Remove from
-     * the list all points that are within distance threshold T2. Repeat until
-     * the list is empty.
-     */
-    int nextCanopyId = 0;
-    while (!points.isEmpty()) {
-      Iterator<VectorWritable> ptIter = points.iterator();
-      Vector p1 = ptIter.next().get();
-      ptIter.remove();
-      Canopy canopy = new VisibleCanopy(p1, nextCanopyId++);
-      canopies.add(canopy);
-      while (ptIter.hasNext()) {
-        Vector p2 = ptIter.next().get();
-        double dist = measure.distance(p1, p2);
-        // Put all points that are within distance threshold T1 into the canopy
-        if (dist < t1) {
-          canopy.addPoint(p2);
-        }
-        // Remove from the list all points that are within distance threshold T2
-        if (dist < t2) {
-          ptIter.remove();
-        }
-      }
-    }
-    return canopies;
-  }
-
+  
   /** Story: User can cluster points using a ManhattanDistanceMeasure and a reference implementation */
   public void testReferenceManhattan() throws Exception {
     System.out.println("testReferenceManhattan");
@@ -213,19 +160,17 @@
     for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
       Canopy testCanopy = referenceManhattan.get(canopyIx);
       int[] expectedNumPoints = {4, 4, 3};
-      double[][] expectedCentroids = {{1.5, 1.5}, {4.0, 4.0},
-          {4.666666666666667, 4.6666666666666667}};
-      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
-          testCanopy.getNumPoints());
+      double[][] expectedCentroids = { {1.5, 1.5}, {4.0, 4.0}, {4.666666666666667, 4.6666666666666667}};
+      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx], testCanopy.getNumPoints());
       double[] refCentroid = expectedCentroids[canopyIx];
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
-        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid[pointIx], testCentroid.get(pointIx));
+        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']', refCentroid[pointIx], testCentroid
+            .get(pointIx));
       }
     }
   }
-
+  
   /** Story: User can cluster points using a EuclideanDistanceMeasure and a reference implementation */
   public void testReferenceEuclidean() throws Exception {
     System.out.println("testReferenceEuclidean()");
@@ -235,63 +180,51 @@
     for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
       Canopy testCanopy = referenceEuclidean.get(canopyIx);
       int[] expectedNumPoints = {5, 5, 3};
-      double[][] expectedCentroids = {{1.8, 1.8}, {4.2, 4.2},
-          {4.666666666666667, 4.666666666666667}};
-      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx],
-          testCanopy.getNumPoints());
+      double[][] expectedCentroids = { {1.8, 1.8}, {4.2, 4.2}, {4.666666666666667, 4.666666666666667}};
+      assertEquals("canopy points " + canopyIx, expectedNumPoints[canopyIx], testCanopy.getNumPoints());
       double[] refCentroid = expectedCentroids[canopyIx];
       Vector testCentroid = testCanopy.computeCentroid();
       for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
-        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
-            refCentroid[pointIx], testCentroid.get(pointIx));
+        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']', refCentroid[pointIx], testCentroid
+            .get(pointIx));
       }
     }
   }
-
+  
   /** Story: User can cluster points without instantiating them all in memory at once */
   public void testIterativeManhattan() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
-    CanopyClusterer clusterer = new CanopyClusterer(new ManhattanDistanceMeasure(), 3.1, 2.1);
-
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    for (VectorWritable point : points) {
-      clusterer.addPointToCanopies(point.get(), canopies, new DummyReporter());
-    }
-
+    List<Vector> points = getPoints(raw);
+    List<Canopy> canopies = CanopyClusterer.createCanopies(points, new ManhattanDistanceMeasure(), 3.1, 2.1);
     System.out.println("testIterativeManhattan");
     printCanopies(canopies);
     verifyManhattanCanopies(canopies);
   }
-
+  
   /** Story: User can cluster points without instantiating them all in memory at once */
   public void testIterativeEuclidean() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
-    CanopyClusterer clusterer = new CanopyClusterer(new EuclideanDistanceMeasure(), 3.1, 2.1);
-
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    for (VectorWritable point : points) {
-      clusterer.addPointToCanopies(point.get(), canopies, new DummyReporter());
-    }
-
+    List<Vector> points = getPoints(raw);
+    List<Canopy> canopies = CanopyClusterer.createCanopies(points, new EuclideanDistanceMeasure(), 3.1, 2.1);
+    
     System.out.println("testIterativeEuclidean");
     printCanopies(canopies);
     verifyEuclideanCanopies(canopies);
   }
-
+  
   /**
-   * Story: User can produce initial canopy centers using a ManhattanDistanceMeasure and a CanopyMapper/Combiner which
-   * clusters input points to produce an output set of canopy centroid points.
+   * Story: User can produce initial canopy centers using a ManhattanDistanceMeasure and a
+   * CanopyMapper/Combiner which clusters input points to produce an output set of canopy centroid points.
    */
   public void testCanopyMapperManhattan() throws Exception {
     CanopyMapper mapper = new CanopyMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
-    List<VectorWritable> points = getPoints(raw);
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
@@ -302,26 +235,26 @@
     List<VectorWritable> data = collector.getValue("centroid");
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++) {
-      assertEquals("Centroid error",
-          manhattanCentroids.get(i).asFormatString(),
-          data.get(i).get().asFormatString());
+      assertEquals("Centroid error", manhattanCentroids.get(i).asFormatString(), data.get(i).get()
+          .asFormatString());
     }
   }
-
+  
   /**
-   * Story: User can produce initial canopy centers using a EuclideanDistanceMeasure and a CanopyMapper/Combiner which
-   * clusters input points to produce an output set of canopy centroid points.
+   * Story: User can produce initial canopy centers using a EuclideanDistanceMeasure and a
+   * CanopyMapper/Combiner which clusters input points to produce an output set of canopy centroid points.
    */
   public void testCanopyMapperEuclidean() throws Exception {
     CanopyMapper mapper = new CanopyMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
-    List<VectorWritable> points = getPoints(raw);
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
@@ -332,26 +265,26 @@
     List<VectorWritable> data = collector.getValue("centroid");
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++) {
-      assertEquals("Centroid error",
-          euclideanCentroids.get(i).asFormatString(),
-          data.get(i).get().asFormatString());
+      assertEquals("Centroid error", euclideanCentroids.get(i).asFormatString(), data.get(i).get()
+          .asFormatString());
     }
   }
-
+  
   /**
-   * Story: User can produce final canopy centers using a ManhattanDistanceMeasure and a CanopyReducer which clusters
-   * input centroid points to produce an output set of final canopy centroid points.
+   * Story: User can produce final canopy centers using a ManhattanDistanceMeasure and a CanopyReducer which
+   * clusters input centroid points to produce an output set of final canopy centroid points.
    */
   public void testCanopyReducerManhattan() throws Exception {
     CanopyReducer reducer = new CanopyReducer();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     reducer.configure(conf);
     
-    DummyOutputCollector<Text, Canopy> collector = new DummyOutputCollector<Text, Canopy>();
-    List<VectorWritable> points = getPoints(raw);
+    DummyOutputCollector<Text,Canopy> collector = new DummyOutputCollector<Text,Canopy>();
+    List<VectorWritable> points = getPointsWritable(raw);
     reducer.reduce(new Text("centroid"), points.iterator(), collector, new DummyReporter());
     reducer.close();
     Set<String> keys = collector.getKeys();
@@ -359,25 +292,28 @@
     int i = 0;
     for (String key : keys) {
       List<Canopy> data = collector.getValue(key);
-      assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to " + data.get(0).computeCentroid().asFormatString(), manhattanCentroids.get(i), data.get(0).computeCentroid());
+      assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to "
+                   + data.get(0).computeCentroid().asFormatString(), manhattanCentroids.get(i), data.get(0)
+          .computeCentroid());
       i++;
     }
   }
-
+  
   /**
-   * Story: User can produce final canopy centers using a EuclideanDistanceMeasure and a CanopyReducer which clusters
-   * input centroid points to produce an output set of final canopy centroid points.
+   * Story: User can produce final canopy centers using a EuclideanDistanceMeasure and a CanopyReducer which
+   * clusters input centroid points to produce an output set of final canopy centroid points.
    */
   public void testCanopyReducerEuclidean() throws Exception {
     CanopyReducer reducer = new CanopyReducer();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     reducer.configure(conf);
     
-    DummyOutputCollector<Text, Canopy> collector = new DummyOutputCollector<Text, Canopy>();
-    List<VectorWritable> points = getPoints(raw);
+    DummyOutputCollector<Text,Canopy> collector = new DummyOutputCollector<Text,Canopy>();
+    List<VectorWritable> points = getPointsWritable(raw);
     reducer.reduce(new Text("centroid"), points.iterator(), collector, new DummyReporter());
     reducer.close();
     Set<String> keys = collector.getKeys();
@@ -385,14 +321,19 @@
     int i = 0;
     for (String key : keys) {
       List<Canopy> data = collector.getValue(key);
-      assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to " + data.get(0).computeCentroid().asFormatString(), euclideanCentroids.get(i), data.get(0).computeCentroid());
+      assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to "
+                   + data.get(0).computeCentroid().asFormatString(), euclideanCentroids.get(i), data.get(0)
+          .computeCentroid());
       i++;
     }
   }
-
-  /** Story: User can produce final canopy centers using a Hadoop map/reduce job and a ManhattanDistanceMeasure. */
+  
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job and a
+   * ManhattanDistanceMeasure.
+   */
   public void testCanopyGenManhattanMR() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -403,7 +344,7 @@
     ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, job);
     // now run the Canopy Driver
     CanopyDriver.runJob("testdata", "output/canopies", ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
-
+    
     // verify output from sequence file
     Path path = new Path("output/canopies/part-00000");
     FileSystem fs = FileSystem.get(path.toUri(), job);
@@ -412,21 +353,24 @@
     Canopy canopy = new Canopy();
     assertTrue("more to come", reader.next(key, canopy));
     assertEquals("1st key", "C0", key.toString());
-    //Canopy canopy = new Canopy(value); //Canopy.decodeCanopy(value.toString());
+    // Canopy canopy = new Canopy(value); //Canopy.decodeCanopy(value.toString());
     assertEquals("1st x value", 1.5, canopy.getCenter().get(0));
     assertEquals("1st y value", 1.5, canopy.getCenter().get(1));
     assertTrue("more to come", reader.next(key, canopy));
     assertEquals("2nd key", "C1", key.toString());
-    //canopy = Canopy.decodeCanopy(canopy.toString());
+    // canopy = Canopy.decodeCanopy(canopy.toString());
     assertEquals("1st x value", 4.333333333333334, canopy.getCenter().get(0));
     assertEquals("1st y value", 4.333333333333334, canopy.getCenter().get(1));
     assertFalse("more to come", reader.next(key, canopy));
     reader.close();
   }
-
-  /** Story: User can produce final canopy centers using a Hadoop map/reduce job and a EuclideanDistanceMeasure. */
+  
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job and a
+   * EuclideanDistanceMeasure.
+   */
   public void testCanopyGenEuclideanMR() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -436,7 +380,7 @@
     ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, job);
     // now run the Canopy Driver
     CanopyDriver.runJob("testdata", "output/canopies", EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
-
+    
     // verify output from sequence file
     Path path = new Path("output/canopies/part-00000");
     FileSystem fs = FileSystem.get(path.toUri(), job);
@@ -454,31 +398,32 @@
     assertFalse("more to come", reader.next(key, value));
     reader.close();
   }
-
+  
   /** Story: User can cluster a subset of the points using a ClusterMapper and a ManhattanDistanceMeasure. */
   public void testClusterMapperManhattan() throws Exception {
     ClusterMapper mapper = new ClusterMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : manhattanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
     }
     mapper.config(canopies);
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String, List<VectorWritable>> data = collector.getData();
+    Map<String,List<VectorWritable>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
@@ -487,7 +432,7 @@
       }
     }
   }
-
+  
   private static Canopy findCanopy(String key, List<Canopy> canopies) {
     for (Canopy c : canopies) {
       if (c.getIdentifier().equals(key)) {
@@ -496,31 +441,32 @@
     }
     return null;
   }
-
+  
   /** Story: User can cluster a subset of the points using a ClusterMapper and a EuclideanDistanceMeasure. */
   public void testClusterMapperEuclidean() throws Exception {
     ClusterMapper mapper = new ClusterMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : euclideanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
     }
     mapper.config(canopies);
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String, List<VectorWritable>> data = collector.getData();
+    Map<String,List<VectorWritable>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
@@ -529,42 +475,43 @@
       }
     }
   }
-
+  
   /** Story: User can cluster a subset of the points using a ClusterReducer and a ManhattanDistanceMeasure. */
   public void testClusterReducerManhattan() throws Exception {
     ClusterMapper mapper = new ClusterMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : manhattanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
     }
     mapper.config(canopies);
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String, List<VectorWritable>> data = collector.getData();
+    Map<String,List<VectorWritable>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-
+    
     // reduce the data
-    Reducer<Text, VectorWritable, Text, VectorWritable> reducer = new IdentityReducer<Text, VectorWritable>();
-    collector = new DummyOutputCollector<Text, VectorWritable>();
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
-      reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry
-          .getValue().iterator(), collector, null);
+    Reducer<Text,VectorWritable,Text,VectorWritable> reducer = new IdentityReducer<Text,VectorWritable>();
+    collector = new DummyOutputCollector<Text,VectorWritable>();
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
+      reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
+        null);
     }
-
+    
     // check the output
     data = collector.getData();
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
@@ -573,42 +520,43 @@
       }
     }
   }
-
+  
   /** Story: User can cluster a subset of the points using a ClusterReducer and a EuclideanDistanceMeasure. */
   public void testClusterReducerEuclidean() throws Exception {
     ClusterMapper mapper = new ClusterMapper();
     JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<Text, VectorWritable>();
+    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : euclideanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
     }
     mapper.config(canopies);
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     // map the data
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String, List<VectorWritable>> data = collector.getData();
-
+    Map<String,List<VectorWritable>> data = collector.getData();
+    
     // reduce the data
-    Reducer<Text, VectorWritable, Text, VectorWritable> reducer = new IdentityReducer<Text, VectorWritable>();
-    collector = new DummyOutputCollector<Text, VectorWritable>();
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
-      reducer.reduce(new Text(stringListEntry.getKey()),
-          stringListEntry.getValue().iterator(), collector, null);
+    Reducer<Text,VectorWritable,Text,VectorWritable> reducer = new IdentityReducer<Text,VectorWritable>();
+    collector = new DummyOutputCollector<Text,VectorWritable>();
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
+      reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
+        null);
     }
-
+    
     // check the output
     data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String, List<VectorWritable>> stringListEntry : data.entrySet()) {
+    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
       String key = stringListEntry.getKey();
       Canopy canopy = findCanopy(key, canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
@@ -617,10 +565,13 @@
       }
     }
   }
-
-  /** Story: User can produce final point clustering using a Hadoop map/reduce job and a ManhattanDistanceMeasure. */
+  
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce job and a
+   * ManhattanDistanceMeasure.
+   */
   public void testClusteringManhattanMR() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -630,16 +581,14 @@
     ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, conf);
     ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, conf);
     // now run the Job
-    CanopyClusteringJob.runJob("testdata", "output",
-        ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
-    //TODO: change
+    CanopyClusteringJob.runJob("testdata", "output", ManhattanDistanceMeasure.class.getName(), 3.1, 2.1);
+    // TODO: change
     Path path = new Path("output/clusters/part-00000");
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
     int count = 0;
-    /*while (reader.ready()) {
-      System.out.println(reader.readLine());
-      count++;
-    }*/
+    /*
+     * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
+     */
     Text txt = new Text();
     VectorWritable vector = new VectorWritable();
     while (reader.next(txt, vector)) {
@@ -650,10 +599,13 @@
     assertEquals("number of points", 2 + 2 * points.size(), count);
     reader.close();
   }
-
-  /** Story: User can produce final point clustering using a Hadoop map/reduce job and a EuclideanDistanceMeasure. */
+  
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce job and a
+   * EuclideanDistanceMeasure.
+   */
   public void testClusteringEuclideanMR() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -662,33 +614,29 @@
     ClusteringTestUtils.writePointsToFile(points, "testdata/file1", fs, conf);
     ClusteringTestUtils.writePointsToFile(points, "testdata/file2", fs, conf);
     // now run the Job
-    CanopyClusteringJob.runJob("testdata", "output",
-        EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
+    CanopyClusteringJob.runJob("testdata", "output", EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
     Path path = new Path("output/clusters/part-00000");
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
     int count = 0;
-    /*while (reader.ready()) {
-      System.out.println(reader.readLine());
-      count++;
-    }*/
+    /*
+     * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
+     */
     Text txt = new Text();
     VectorWritable can = new VectorWritable();
     while (reader.next(txt, can)) {
       count++;
     }
-    /*while (reader.ready()) {
-      System.out.println(reader.readLine());
-      count++;
-    }*/
+    /*
+     * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
+     */
     // the point [3.0,3.0] is covered by both canopies
     assertEquals("number of points", 2 + 2 * points.size(), count);
     reader.close();
   }
-
-
+  
   /** Story: Clustering algorithm must support arbitrary user defined distance measure */
   public void testUserDefinedDistanceMeasure() throws Exception {
-    List<VectorWritable> points = getPoints(raw);
+    List<VectorWritable> points = getPointsWritable(raw);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -699,7 +647,7 @@
     // now run the Canopy Driver. User defined measure happens to be a Manhattan
     // subclass so results are same.
     CanopyDriver.runJob("testdata", "output/canopies", UserDefinedDistanceMeasure.class.getName(), 3.1, 2.1);
-
+    
     // verify output from sequence file
     JobConf job = new JobConf(CanopyDriver.class);
     Path path = new Path("output/canopies/part-00000");
@@ -709,12 +657,12 @@
     Canopy value = new Canopy();
     assertTrue("more to come", reader.next(key, value));
     assertEquals("1st key", "C0", key.toString());
-
+    
     assertEquals("1st x value", 1.5, value.getCenter().get(0));
     assertEquals("1st y value", 1.5, value.getCenter().get(1));
     assertTrue("more to come", reader.next(key, value));
     assertEquals("2nd key", "C1", key.toString());
-
+    
     assertEquals("1st x value", 4.333333333333334, value.getCenter().get(0));
     assertEquals("1st y value", 4.333333333333334, value.getCenter().get(1));
     assertFalse("more to come", reader.next(key, value));

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java?rev=917396&r1=917395&r2=917396&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestDirichletClustering.java Mon Mar  1 05:42:35 2010
@@ -17,6 +17,9 @@
 
 package org.apache.mahout.clustering.dirichlet;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.mahout.clustering.dirichlet.models.AsymmetricSampledNormalDistribution;
 import org.apache.mahout.clustering.dirichlet.models.Model;
 import org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution;
@@ -25,9 +28,6 @@
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.VectorWritable;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class TestDirichletClustering extends MahoutTestCase {
 
   private List<VectorWritable> sampleData;

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java?rev=917396&r1=917395&r2=917396&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java Mon Mar  1 05:42:35 2010
@@ -66,7 +66,8 @@
    *          double y-standard deviation of the samples
    */
   private void generateSamples(int num, double mx, double my, double sdx, double sdy) {
-    System.out.println("Generating " + num + " samples m=[" + mx + ", " + my + "] sd=[" + sdx + ", " + sdy + ']');
+    System.out.println("Generating " + num + " samples m=[" + mx + ", " + my + "] sd=[" + sdx + ", " + sdy
+                       + ']');
     for (int i = 0; i < num; i++) {
       addSample(new double[] {UncommonDistributions.rNorm(mx, sdx), UncommonDistributions.rNorm(my, sdy)});
     }
@@ -115,7 +116,7 @@
   public void testMapper() throws Exception {
     generateSamples(10, 0, 0, 1);
     DirichletState<VectorWritable> state = new DirichletState<VectorWritable>(new NormalModelDistribution(
-        new VectorWritable(new DenseVector(2))), 5, 1, 0, 0);
+        new VectorWritable(new DenseVector(2))), 5, 1);
     DirichletMapper mapper = new DirichletMapper();
     mapper.configure(state);
     
@@ -135,7 +136,7 @@
     generateSamples(100, 0, 2, 1);
     generateSamples(100, 2, 2, 1);
     DirichletState<VectorWritable> state = new DirichletState<VectorWritable>(new SampledNormalDistribution(
-        new VectorWritable(new DenseVector(2))), 20, 1, 1, 0);
+        new VectorWritable(new DenseVector(2))), 20, 1);
     DirichletMapper mapper = new DirichletMapper();
     mapper.configure(state);
     
@@ -180,7 +181,7 @@
     generateSamples(100, 0, 2, 1);
     generateSamples(100, 2, 2, 1);
     DirichletState<VectorWritable> state = new DirichletState<VectorWritable>(new SampledNormalDistribution(
-        new VectorWritable(new DenseVector(2))), 20, 1.0, 1, 0);
+        new VectorWritable(new DenseVector(2))), 20, 1.0);
     
     List<Model<VectorWritable>[]> models = new ArrayList<Model<VectorWritable>[]>();
     

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=917396&r1=917395&r2=917396&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Mon Mar  1 05:42:35 2010
@@ -17,6 +17,12 @@
 
 package org.apache.mahout.clustering.fuzzykmeans;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,24 +31,18 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.clustering.ClusteringTestUtils;
 import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.DummyOutputCollector;
 import org.apache.mahout.common.DummyReporter;
 import org.apache.mahout.common.MahoutTestCase;
-import org.apache.mahout.math.Vector;
 import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.common.DummyOutputCollector;
 import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class TestFuzzyKmeansClustering extends MahoutTestCase {
-
+  
   private FileSystem fs;
-
+  
   private static void rmr(String path) {
     File f = new File(path);
     if (f.exists()) {
@@ -55,7 +55,7 @@
       f.delete();
     }
   }
-
+  
   @Override
   protected void setUp() throws Exception {
     super.setUp();
@@ -64,120 +64,70 @@
     Configuration conf = new Configuration();
     fs = FileSystem.get(conf);
   }
-
+  
   private static double round(double val, int places) {
     long factor = (long) Math.pow(10, places);
-
+    
     // Shift the decimal the correct number of places
     // to the right.
     val *= factor;
-
+    
     // Round to the nearest integer.
     long tmp = Math.round(val);
-
+    
     // Shift the decimal the correct number of places
     // back to the left.
     return (double) tmp / factor;
   }
-
+  
   private static Vector tweakValue(Vector point) {
     return point.plus(0.1);
-
+    
   }
-
-  private static void referenceFuzzyKMeans(List<VectorWritable> points,
-                                          List<SoftCluster> clusterList, Map<String, String> pointClusterInfo,
-                                          String distanceMeasureClass, double threshold, double m, int numIter)
-      throws Exception {
-    ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    Class<?> cl = ccl.loadClass(distanceMeasureClass);
-
-    DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
-    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, threshold, m);
-    boolean converged = false;
-    for (int iter = 0; !converged && iter < numIter; iter++) {
-      converged = iterateReference(points, clusterList, clusterer);
-    }
-    computeCluster(points, clusterList, clusterer, pointClusterInfo);
-  }
-
-  private static boolean iterateReference(List<VectorWritable> points,
-                                         List<SoftCluster> clusterList, FuzzyKMeansClusterer clusterer) {
-    // for each
-    for (VectorWritable pointWritable : points) {
-      Vector point = pointWritable.get();
-      List<Double> clusterDistanceList = new ArrayList<Double>();
-      for (SoftCluster cluster : clusterList) {
-        clusterDistanceList.add(clusterer.getMeasure().distance(point, cluster.getCenter()));
-      }
-
-      for (int i = 0; i < clusterList.size(); i++) {
-        double probWeight = clusterer.computeProbWeight(clusterDistanceList
-            .get(i), clusterDistanceList);
-        clusterList.get(i).addPoint(point,
-            Math.pow(probWeight, clusterer.getM()));
-      }
-    }
-    boolean converged = true;
-    for (SoftCluster cluster : clusterList) {
-      if (!clusterer.computeConvergence(cluster)) {
-        converged = false;
-      }
-    }
-    // update the cluster centers
-    if (!converged) {
-      for (SoftCluster cluster : clusterList) {
-        cluster.recomputeCenter();
-      }
-    }
-    return converged;
-
-  }
-
-  private static void computeCluster(List<VectorWritable> points,
-                                    List<SoftCluster> clusterList, FuzzyKMeansClusterer clusterer,
-                                    Map<String, String> pointClusterInfo) {
-
-    for (VectorWritable pointWritable : points) {
-      Vector point = pointWritable.get();
+  
+  private static void computeCluster(List<Vector> points,
+                                     List<SoftCluster> clusterList,
+                                     FuzzyKMeansClusterer clusterer,
+                                     Map<String,String> pointClusterInfo) {
+    
+    for (Vector point : points) {
       StringBuilder outputValue = new StringBuilder("[");
       List<Double> clusterDistanceList = new ArrayList<Double>();
       for (SoftCluster cluster : clusterList) {
         clusterDistanceList.add(clusterer.getMeasure().distance(point, cluster.getCenter()));
       }
       for (int i = 0; i < clusterList.size(); i++) {
-        double probWeight = clusterer.computeProbWeight(clusterDistanceList
-            .get(i), clusterDistanceList);
-        outputValue.append(clusterList.get(i).getClusterId()).append(':')
-            .append(probWeight).append(' ');
+        double probWeight = clusterer.computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+        outputValue.append(clusterList.get(i).getId()).append(':').append(probWeight).append(' ');
       }
       String name = point.getName();
-      pointClusterInfo.put(name != null && name.length() != 0 ? name : point.asFormatString().trim(), outputValue
-          .toString().trim()
-          + ']');
+      pointClusterInfo.put(name != null && name.length() != 0 ? name : point.asFormatString().trim(),
+        outputValue.toString().trim() + ']');
     }
   }
-
+  
   public void testReferenceImplementation() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
+    List<Vector> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
     for (int k = 0; k < points.size(); k++) {
       System.out.println("test k= " + k);
-
+      
       List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
       // pick k initial cluster centers at random
       for (int i = 0; i < k + 1; i++) {
-        Vector vec = tweakValue(points.get(i).get());
+        Vector vec = tweakValue(points.get(i));
         SoftCluster cluster = new SoftCluster(vec);
         // add the center so the centroid will be correct upon output
         cluster.addPoint(cluster.getCenter(), 1);
-
+        
         clusterList.add(cluster);
       }
-      Map<String, String> pointClusterInfo = new HashMap<String, String>();
+      Map<String,String> pointClusterInfo = new HashMap<String,String>();
       // run reference FuzzyKmeans algorithm
-      referenceFuzzyKMeans(points, clusterList, pointClusterInfo,
-          EuclideanDistanceMeasure.class.getName(), 0.001, 2, 2);
-
+      List<List<SoftCluster>> clusters = FuzzyKMeansClusterer.clusterPoints(points, clusterList,
+        new EuclideanDistanceMeasure(), 0.001, 2, 2);
+      computeCluster(points, clusters.get(clusters.size() - 1), new FuzzyKMeansClusterer(
+          new EuclideanDistanceMeasure(), 0.001, 2), pointClusterInfo);
+      
       // iterate for each point
       for (String value : pointClusterInfo.values()) {
         String clusterInfoStr = value.substring(1, value.length() - 1);
@@ -186,20 +136,18 @@
         double prob = 0.0;
         for (String clusterInfo : clusterInfoList) {
           String[] clusterProb = clusterInfo.split(":");
-
+          
           double clusterProbVal = Double.parseDouble(clusterProb[1]);
           prob += clusterProbVal;
         }
         prob = round(prob, 1);
-        assertEquals(
-            "Sum of cluster Membership problability should be equal to=", 1.0,
-            prob);
+        assertEquals("Sum of cluster Membership problability should be equal to=", 1.0, prob);
       }
     }
   }
-
+  
   public void testFuzzyKMeansMRJob() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
     File testData = new File("testdata");
     if (!testData.exists()) {
       testData.mkdir();
@@ -210,8 +158,7 @@
     }
     Configuration conf = new Configuration();
     ClusteringTestUtils.writePointsToFile(points, "testdata/points/file1", fs, conf);
-    ClusteringTestUtils.writePointsToFile(points, "testdata/points/file2", fs, conf);
-
+    
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
@@ -221,300 +168,299 @@
       if (fs.exists(path)) {
         fs.delete(path, true);
       }
-
+      
       testData = new File("testdata/clusters");
       if (!testData.exists()) {
         testData.mkdir();
       }
-
-      /*BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
-          new FileOutputStream("testdata/clusters/part-00000"), Charset
-              .forName("UTF-8")));
-*/
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path("testdata/clusters/part-00000"),
-          Text.class, SoftCluster.class);
+      
+      /*
+       * BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( new
+       * FileOutputStream("testdata/clusters/part-00000"), Charset .forName("UTF-8")));
+       */
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+          new Path("testdata/clusters/part-00000"), Text.class, SoftCluster.class);
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
-
+        
         SoftCluster cluster = new SoftCluster(vec);
         // add the center so the centroid will be correct upon output
         cluster.addPoint(cluster.getCenter(), 1);
-        /*writer.write(cluster.getIdentifier() + '\t'
-            + SoftCluster.formatCluster(cluster) + '\n');*/
+        /*
+         * writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+         */
         writer.append(new Text(cluster.getIdentifier()), cluster);
-
+        
       }
       writer.close();
-
+      
       Path outPath = new Path("output");
       fs = FileSystem.get(outPath.toUri(), conf);
       if (fs.exists(outPath)) {
         fs.delete(outPath, true);
       }
       fs.mkdirs(outPath);
-      // now run the Job      
-      FuzzyKMeansDriver.runJob("testdata/points", "testdata/clusters",
-          "output", EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1,
-          k + 1, 2);
-
+      // now run the Job
+      FuzzyKMeansDriver.runJob("testdata/points", "testdata/clusters", "output",
+        EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1, k + 1, 2);
+      
       // now compare the expected clusters with actual
       File outDir = new File("output/points");
       assertTrue("output dir exists?", outDir.exists());
       outDir.list();
-//      assertEquals("output dir files?", 4, outFiles.length);
+      // assertEquals("output dir files?", 4, outFiles.length);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
       Text key = new Text();
       FuzzyKMeansOutput out = new FuzzyKMeansOutput();
       while (reader.next(key, out)) {
-        /*String line = reader.readLine();
-        String[] lineParts = line.split("\t");
-        assertEquals("line parts", 2, lineParts.length);
-        String clusterInfoStr = lineParts[1].replace("[", "").replace("]", "");
-
-        String[] clusterInfoList = clusterInfoStr.split(" ");
-        assertEquals("Number of clusters", k + 1, clusterInfoList.length);
-        */
+        /*
+         * String line = reader.readLine(); String[] lineParts = line.split("\t"); assertEquals("line parts",
+         * 2, lineParts.length); String clusterInfoStr = lineParts[1].replace("[", "").replace("]", "");
+         * 
+         * String[] clusterInfoList = clusterInfoStr.split(" "); assertEquals("Number of clusters", k + 1,
+         * clusterInfoList.length);
+         */
         double prob = 0.0;
         double[] probabilities = out.getProbabilities();
         for (double probability : probabilities) {
-          //SoftCluster cluster = clusters[i];
+          // SoftCluster cluster = clusters[i];
           prob += probability;
         }
         prob = round(prob, 1);
-        assertEquals(
-            "Sum of cluster Membership probability should be equal to=", 1.0,
-            prob);
+        assertEquals("Sum of cluster Membership probability should be equal to=", 1.0, prob);
       }
-
+      
       reader.close();
-
+      
     }
-
+    
   }
-
+  
   public void testFuzzyKMeansMapper() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
-
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
+    
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
       List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
-
+      
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
-
+        
         SoftCluster cluster = new SoftCluster(vec, i);
         cluster.addPoint(cluster.getCenter(), 1);
         clusterList.add(cluster);
       }
-
+      
       // run mapper
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
       
       JobConf conf = new JobConf();
-      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
       conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
       conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
       mapper.configure(conf);
       
-      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       for (VectorWritable point : points) {
         mapper.map(new Text(), point, mapCollector, null);
       }
-
+      
       // now verify mapper output
       assertEquals("Mapper Keys", k + 1, mapCollector.getData().size());
-
-      Map<Vector, Double> pointTotalProbMap = new HashMap<Vector, Double>();
-
+      
+      Map<Vector,Double> pointTotalProbMap = new HashMap<Vector,Double>();
+      
       for (String key : mapCollector.getKeys()) {
         // SoftCluster cluster = SoftCluster.decodeCluster(key);
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
-
+        
         for (FuzzyKMeansInfo value : values) {
-
+          
           Double val = pointTotalProbMap.get(value.getVector());
           double probVal = 0.0;
           if (val != null) {
             probVal = val;
           }
-
+          
           pointTotalProbMap.put(value.getVector(), probVal + value.getProbability());
         }
       }
-
-      for (Map.Entry<Vector, Double> entry : pointTotalProbMap.entrySet()) {
+      
+      for (Map.Entry<Vector,Double> entry : pointTotalProbMap.entrySet()) {
         Vector key = entry.getKey();
         double value = round(entry.getValue(), 1);
-
+        
         assertEquals("total Prob for Point:" + key, 1.0, value);
       }
     }
   }
-
+  
   public void testFuzzyKMeansCombiner() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
-
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
+    
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
       List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
-
+      
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
-
+        
         SoftCluster cluster = new SoftCluster(vec, i);
         cluster.addPoint(cluster.getCenter(), 1);
         clusterList.add(cluster);
       }
-
+      
       // run mapper
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
-
+      
       JobConf conf = new JobConf();
-      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
       conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
       conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
       mapper.configure(conf);
       
-      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       for (VectorWritable point : points) {
         mapper.map(new Text(), point, mapCollector, null);
       }
-
+      
       // run combiner
-      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
       for (String key : mapCollector.getKeys()) {
-
+        
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
-        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
-            null);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
       }
-
+      
       // now verify the combiner output
       assertEquals("Combiner Output", k + 1, combinerCollector.getData().size());
-
+      
       for (String key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         assertEquals("too many values", 1, values.size());
       }
     }
   }
-
+  
   public void testFuzzyKMeansReducer() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
-
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
+    
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
       List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
-
+      
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
-
+        
         SoftCluster cluster = new SoftCluster(vec, i);
         // cluster.addPoint(cluster.getCenter(), 1);
         clusterList.add(cluster);
       }
-
+      
       // run mapper
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
-
+      
       JobConf conf = new JobConf();
-      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
       conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
       conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
       mapper.configure(conf);
       
-      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       for (VectorWritable point : points) {
-        mapper.map(new Text(), point, mapCollector,
-            null);
+        mapper.map(new Text(), point, mapCollector, null);
       }
-
+      
       // run combiner
-      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
       for (String key : mapCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
-        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
-            null);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
       }
-
+      
       // run reducer
-      DummyOutputCollector<Text, SoftCluster> reducerCollector = new DummyOutputCollector<Text, SoftCluster>();
+      DummyOutputCollector<Text,SoftCluster> reducerCollector = new DummyOutputCollector<Text,SoftCluster>();
       FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
       reducer.config(clusterList);
       reducer.configure(conf);
       
       for (String key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
-        reducer
-            .reduce(new Text(key), values.iterator(), reducerCollector, new DummyReporter());
+        reducer.reduce(new Text(key), values.iterator(), reducerCollector, new DummyReporter());
       }
-
+      
       // now verify the reducer output
       assertEquals("Reducer Output", k + 1, combinerCollector.getData().size());
-
+      
       // compute the reference result after one iteration and compare
       List<SoftCluster> reference = new ArrayList<SoftCluster>();
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
         reference.add(new SoftCluster(vec, i));
       }
+      List<Vector> pointsVectors = new ArrayList<Vector>();
+      for(VectorWritable point : points)
+        pointsVectors.add(point.get());
       
       DistanceMeasure measure = new EuclideanDistanceMeasure();
       FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, 0.001, 2);
-      iterateReference(points, reference, clusterer);
+      FuzzyKMeansClusterer.runFuzzyKMeansIteration(pointsVectors, reference, clusterer);
       
       for (SoftCluster key : reference) {
         String clusterId = key.getIdentifier();
         List<SoftCluster> values = reducerCollector.getValue(clusterId);
         SoftCluster cluster = values.get(0);
-        System.out.println("ref= " + key.toString() + " cluster= "
-            + cluster.toString());
+        System.out.println("ref= " + key.toString() + " cluster= " + cluster.toString());
         cluster.recomputeCenter();
-        assertEquals("key center: " + key.getCenter().asFormatString() + " does not equal cluster: " +
-            cluster.getCenter().asFormatString(), key.getCenter(), cluster.getCenter());
+        assertEquals("key center: " + key.getCenter().asFormatString() + " does not equal cluster: "
+                     + cluster.getCenter().asFormatString(), key.getCenter(), cluster.getCenter());
       }
     }
   }
-
+  
   public void testFuzzyKMeansClusterMapper() throws Exception {
-    List<VectorWritable> points = TestKmeansClustering.getPoints(TestKmeansClustering.reference);
-
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
+    
     for (int k = 0; k < points.size(); k++) {
       System.out.println("testKFuzzyKMeansMRJob k= " + k);
       // pick k initial cluster centers at random
       List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
-
+      
       for (int i = 0; i < k + 1; i++) {
         Vector vec = tweakValue(points.get(i).get());
-
+        
         SoftCluster cluster = new SoftCluster(vec, i);
         cluster.addPoint(cluster.getCenter(), 1);
         clusterList.add(cluster);
       }
-
+      
       // run mapper
       FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
       mapper.config(clusterList);
-
+      
       JobConf conf = new JobConf();
-      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+      conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
       conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.001");
       conf.set(FuzzyKMeansConfigKeys.M_KEY, "2");
       mapper.configure(conf);
       
-      DummyOutputCollector<Text, FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> mapCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       for (VectorWritable point : points) {
         mapper.map(new Text(), point, mapCollector, null);
       }
@@ -522,32 +468,30 @@
         softCluster.recomputeCenter();
       }
       // run combiner
-      DummyOutputCollector<Text, FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text, FuzzyKMeansInfo>();
+      DummyOutputCollector<Text,FuzzyKMeansInfo> combinerCollector = new DummyOutputCollector<Text,FuzzyKMeansInfo>();
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
       for (String key : mapCollector.getKeys()) {
-
+        
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
-        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
-            null);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
       }
-
+      
       // run reducer
-      DummyOutputCollector<Text, SoftCluster> reducerCollector = new DummyOutputCollector<Text, SoftCluster>();
+      DummyOutputCollector<Text,SoftCluster> reducerCollector = new DummyOutputCollector<Text,SoftCluster>();
       FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
       reducer.config(clusterList);
       reducer.configure(conf);
       
       for (String key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
-        reducer
-            .reduce(new Text(key), values.iterator(), reducerCollector, null);
+        reducer.reduce(new Text(key), values.iterator(), reducerCollector, null);
       }
-
+      
       // run clusterMapper
       List<SoftCluster> reducerCluster = new ArrayList<SoftCluster>();
-
+      
       for (String key : reducerCollector.getKeys()) {
         List<SoftCluster> values = reducerCollector.getValue(key);
         reducerCluster.add(values.get(0));
@@ -555,9 +499,8 @@
       for (SoftCluster softCluster : reducerCluster) {
         softCluster.recomputeCenter();
       }
-
-      DummyOutputCollector<Text, FuzzyKMeansOutput> clusterMapperCollector 
-        = new DummyOutputCollector<Text, FuzzyKMeansOutput>();
+      
+      DummyOutputCollector<Text,FuzzyKMeansOutput> clusterMapperCollector = new DummyOutputCollector<Text,FuzzyKMeansOutput>();
       
       FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
       clusterMapper.config(reducerCluster);
@@ -566,7 +509,7 @@
       for (VectorWritable point : points) {
         clusterMapper.map(new Text(), point, clusterMapperCollector, null);
       }
-
+      
       // now run for one iteration of referencefuzzykmeans and compare the
       // results
       // compute the reference result after one iteration and compare
@@ -575,25 +518,31 @@
         Vector vec = tweakValue(points.get(i).get());
         reference.add(new SoftCluster(vec, i));
       }
-      Map<String, String> pointClusterInfo = new HashMap<String, String>();
-      referenceFuzzyKMeans(points, reference, pointClusterInfo,
-          EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1);
-
+      Map<String,String> pointClusterInfo = new HashMap<String,String>();
+      List<Vector> pointsVectors = new ArrayList<Vector>();
+      for(VectorWritable point : points)
+        pointsVectors.add(point.get());
+      
+      List<List<SoftCluster>> clusters = FuzzyKMeansClusterer.clusterPoints(pointsVectors, reference,
+        new EuclideanDistanceMeasure(), 0.001, 2, 1);
+      computeCluster(pointsVectors, clusters.get(clusters.size() - 1), new FuzzyKMeansClusterer(
+          new EuclideanDistanceMeasure(), 0.001, 2), pointClusterInfo);
+      
       // Now compare the clustermapper results with reducer
       for (String key : clusterMapperCollector.getKeys()) {
         List<FuzzyKMeansOutput> value = clusterMapperCollector.getValue(key);
-
+        
         String refValue = pointClusterInfo.get(key);
         String clusterInfoStr = refValue.substring(1, refValue.length() - 1);
         String[] refClusterInfoList = clusterInfoStr.split(" ");
         assertEquals("Number of clusters", k + 1, refClusterInfoList.length);
-        Map<String, Double> refClusterInfoMap = new HashMap<String, Double>();
+        Map<String,Double> refClusterInfoMap = new HashMap<String,Double>();
         for (String clusterInfo : refClusterInfoList) {
           String[] clusterProb = clusterInfo.split(":");
           double clusterProbVal = Double.parseDouble(clusterProb[1]);
           refClusterInfoMap.put(clusterProb[0], clusterProbVal);
         }
-
+        
         FuzzyKMeansOutput kMeansOutput = value.get(0);
         SoftCluster[] softClusters = kMeansOutput.getClusters();
         double[] probabilities = kMeansOutput.getProbabilities();
@@ -601,17 +550,20 @@
         for (String clusterInfo : refClusterInfoList) {
           String[] clusterProb = clusterInfo.split(":");
           double clusterProbVal = Double.parseDouble(clusterProb[1]);
-          System.out.println(k + " point:" + key + ": Cluster: " + clusterProb[0] + " prob: " + clusterProbVal);
-          /*assertEquals(,
-                  refClusterInfoMap.get(clusterProb[0]), clusterProbVal);*/
+          System.out.println(k + " point:" + key + ": Cluster: " + clusterProb[0] + " prob: "
+                             + clusterProbVal);
+          /*
+           * assertEquals(, refClusterInfoMap.get(clusterProb[0]), clusterProbVal);
+           */
         }
         for (int i = 0; i < softClusters.length; i++) {
           SoftCluster softCluster = softClusters[i];
-          Double refProb = refClusterInfoMap.get(String.valueOf(softCluster.getClusterId()));
-          assertEquals(k + " point: " + key + ": expected probability: " + refProb + " was: " + probabilities[i], refProb, probabilities[i]);
+          Double refProb = refClusterInfoMap.get(String.valueOf(softCluster.getId()));
+          assertEquals(k + " point: " + key + ": expected probability: " + refProb + " was: "
+                       + probabilities[i], refProb, probabilities[i]);
         }
       }
     }
   }
-
+  
 }