You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2009/06/23 21:37:04 UTC

svn commit: r787801 - in /lucene/mahout/trunk/core/src: main/java/org/apache/mahout/clustering/canopy/ main/java/org/apache/mahout/clustering/fuzzykmeans/ main/java/org/apache/mahout/clustering/kmeans/ test/java/org/apache/mahout/clustering/fuzzykmeans...

Author: jeastman
Date: Tue Jun 23 19:37:04 2009
New Revision: 787801

URL: http://svn.apache.org/viewvc?rev=787801&view=rev
Log:
MAHOUT-137 - converted Canopy, KMeans and FuzzyKM to use AbstractVector serialization utilities. All tests run.

Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Tue Jun 23 19:37:04 2009
@@ -225,14 +225,13 @@
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(canopyId);
-    computeCentroid().write(out);
+    AbstractVector.writeVector(out, computeCentroid());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     canopyId = in.readInt();
-    this.center = new SparseVector();
-    center.readFields(in);
+    this.center = AbstractVector.readVector(in);
     this.pointTotal = center.clone();
     this.numPoints = 1;
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java Tue Jun 23 19:37:04 2009
@@ -1,12 +1,12 @@
 package org.apache.mahout.clustering.fuzzykmeans;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.mahout.matrix.Vector;
-import org.apache.mahout.clustering.kmeans.Cluster;
-
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.DataInput;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
 
 
 /**
@@ -45,15 +45,12 @@
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeDouble(probability);
-    out.writeUTF(pointTotal.getClass().getSimpleName().toString());
-    pointTotal.write(out);
+    AbstractVector.writeVector(out, pointTotal);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     this.probability = in.readDouble();
-    String className = in.readUTF();
-    pointTotal = Cluster.vectorNameToVector(className);
-    pointTotal.readFields(in);
+    this.pointTotal = AbstractVector.readVector(in);
   }
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java Tue Jun 23 19:37:04 2009
@@ -17,9 +17,9 @@
 
 package org.apache.mahout.clustering.fuzzykmeans;
 
-import java.io.IOException;
-import java.io.DataOutput;
 import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -32,7 +32,6 @@
 import org.apache.mahout.matrix.SquareRootFunction;
 import org.apache.mahout.matrix.Vector;
 import org.apache.mahout.utils.DistanceMeasure;
-import org.apache.mahout.clustering.kmeans.Cluster;
 
 public class SoftCluster implements Writable {
 
@@ -124,17 +123,14 @@
     out.writeInt(clusterId);
     out.writeBoolean(converged);
     Vector vector = computeCentroid();
-    out.writeUTF(vector.getClass().getSimpleName().toString());
-    vector.write(out);
+    AbstractVector.writeVector(out, vector);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     clusterId = in.readInt();
     converged = in.readBoolean();
-    String className = in.readUTF();
-    this.center = Cluster.vectorNameToVector(className);
-    center.readFields(in);
+    center = AbstractVector.readVector(in);
     this.pointProbSum = 0;
     this.weightedPointTotal = center.like();
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java Tue Jun 23 19:37:04 2009
@@ -16,9 +16,9 @@
  */
 package org.apache.mahout.clustering.kmeans;
 
-import java.io.IOException;
-import java.io.DataOutput;
 import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
@@ -29,7 +29,6 @@
 import org.apache.mahout.matrix.SparseVector;
 import org.apache.mahout.matrix.SquareRootFunction;
 import org.apache.mahout.matrix.Vector;
-import org.apache.mahout.matrix.DenseVector;
 import org.apache.mahout.utils.DistanceMeasure;
 
 public class Cluster implements Writable {
@@ -120,39 +119,20 @@
   public void write(DataOutput out) throws IOException {
     out.writeInt(clusterId);
     out.writeBoolean(converged);
-    Vector vector = computeCentroid();
-    out.writeUTF(vector.getClass().getSimpleName().toString());
-    vector.write(out);
+    AbstractVector.writeVector(out, computeCentroid());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    clusterId = in.readInt();
-    converged = in.readBoolean();
-    String className = in.readUTF();
-    this.center = vectorNameToVector(className);
-    center.readFields(in);
+    this.clusterId = in.readInt();
+    this.converged = in.readBoolean();
+    this.center = AbstractVector.readVector(in);
     this.numPoints = 0;
     this.pointTotal = center.like();
     this.pointSquaredTotal = center.like();
   }
 
   /**
-   * Simplified version that only handles SparseVector and DenseVector
-   * @param className
-   * @return
-   */
-  public static Vector vectorNameToVector(String className) {
-    Vector vector;
-    if (className.endsWith("SparseVector")){
-      vector = new SparseVector();
-    } else {
-      vector = new DenseVector();
-    }
-    return vector;
-  }
-
-  /**
    * Configure the distance measure from the job
    * 
    * @param job the JobConf for the job

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java Tue Jun 23 19:37:04 2009
@@ -1,6 +1,7 @@
 package org.apache.mahout.clustering.kmeans;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.AbstractVector;
 import org.apache.mahout.matrix.Vector;
 
 import java.io.DataOutput;
@@ -36,15 +37,12 @@
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(points);
-    out.writeUTF(pointTotal.getClass().getSimpleName().toString());
-    pointTotal.write(out);
+    AbstractVector.writeVector(out, pointTotal);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     this.points = in.readInt();
-    String className = in.readUTF();
-    pointTotal = Cluster.vectorNameToVector(className);
-    pointTotal.readFields(in);
+    this.pointTotal = AbstractVector.readVector(in);
   }
 }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Tue Jun 23 19:37:04 2009
@@ -17,14 +17,7 @@
 
 package org.apache.mahout.clustering.fuzzykmeans;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -32,16 +25,16 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
 import org.apache.mahout.clustering.ClusteringTestUtils;
-import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
 import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
 import org.apache.mahout.utils.DistanceMeasure;
 import org.apache.mahout.utils.DummyOutputCollector;
 import org.apache.mahout.utils.EuclideanDistanceMeasure;

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java?rev=787801&r1=787800&r2=787801&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java Tue Jun 23 19:37:04 2009
@@ -17,12 +17,21 @@
 
 package org.apache.mahout.clustering.meanshift;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.matrix.CardinalityException;
 import org.apache.mahout.matrix.DenseVector;
@@ -31,22 +40,15 @@
 import org.apache.mahout.utils.DummyOutputCollector;
 import org.apache.mahout.utils.EuclideanDistanceMeasure;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.FileOutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.nio.charset.Charset;
-
 public class TestMeanShift extends TestCase {
 
   Vector[] raw = null;
 
-  //DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+  static FileSystem fs;
+
+  static Configuration conf;
+
+  // DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
 
   final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
 
@@ -65,45 +67,6 @@
     }
   }
 
-  /** 
-   * Write the given points to the file within an enclosing MeanShiftCanopy
-   * @param points a Vector[] of points
-   * @param fileName the String file name
-   * @param payload a String payload that goes with each point.
-   * TODO: handle payloads associated with points. Currently they are ignored
-   * @throws IOException
-   */
-  private static void writePointsToFileWithPayload(Vector[] points, String fileName,
-      String payload) throws IOException {
-    BufferedWriter output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileName), Charset.forName("UTF-8")));
-    for (Vector point : points) {
-      output.write(new MeanShiftCanopy(point).toString());
-      output.write(payload);
-      output.write('\n');
-    }
-    output.flush();
-    output.close();
-  }
-
-  /**
-   * Recursively remove the contents of a directory
-   * 
-   * @param path
-   * @throws Exception
-   */
-  private static void rmr(String path) throws Exception {
-    File f = new File(path);
-    if (f.exists()) {
-      if (f.isDirectory()) {
-        String[] contents = f.list();
-        for (String content : contents) {
-          rmr(f.toString() + File.separator + content);
-        }
-      }
-      f.delete();
-    }
-  }
-
   /**
    * Print a graphical representation of the clustered image points as a 10x10
    * character mask
@@ -125,10 +88,36 @@
     }
   }
 
+  private static void rmr(String path) throws Exception {
+    File f = new File(path);
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        String[] contents = f.list();
+        for (String content : contents) {
+          rmr(f.toString() + File.separator + content);
+        }
+      }
+      f.delete();
+    }
+  }
+
+  private static void writePointsToFile(Vector[] points, String fileName)
+      throws IOException {
+    Path path = new Path(fileName);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        LongWritable.class, MeanShiftCanopy.class);
+    long recNum = 0;
+    for (Vector point : points) {
+      MeanShiftCanopy canopy = new MeanShiftCanopy(point);
+      writer.append(new LongWritable(recNum++), canopy);
+    }
+    writer.close();
+  }
+
   private List<MeanShiftCanopy> getInitialCanopies() {
     List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
-    for (Vector aRaw : raw) {
-      canopies.add(new MeanShiftCanopy(aRaw));
+    for (Vector point : raw) {
+      canopies.add(new MeanShiftCanopy(point));
     }
     return canopies;
   }
@@ -136,6 +125,8 @@
   @Override
   protected void setUp() throws Exception {
     super.setUp();
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
     rmr("output");
     rmr("testdata");
     raw = new Vector[100];
@@ -191,8 +182,7 @@
    */
   public void testCanopyMapperEuclidean() throws Exception {
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
-    MeanShiftCanopyCombiner combiner = new MeanShiftCanopyCombiner();
-    DummyOutputCollector<Text,WritableComparable<?>> collector = new DummyOutputCollector<Text,WritableComparable<?>>();
+    DummyOutputCollector<Text, MeanShiftCanopy> collector = new DummyOutputCollector<Text, MeanShiftCanopy>();
     MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
@@ -203,21 +193,15 @@
     }
 
     // map the data
-    for (MeanShiftCanopy canopy : canopies) {
-      mapper.map(new Text(), new Text(canopy.toString()), collector, null);
-    }
-    assertEquals("Number of map results", 100, collector.getData().size());
-    // now combine the mapper output
-    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
-    Map<String, List<WritableComparable<?>>> mapData = collector.getData();
-    collector = new DummyOutputCollector<Text,WritableComparable<?>>();
-    for (Map.Entry<String, List<WritableComparable<?>>> stringListEntry : mapData.entrySet())
-      combiner.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
-          null);
+    for (MeanShiftCanopy canopy : canopies)
+      mapper.map(new Text(), canopy, collector, null);
+    mapper.close();
 
     // now verify the output
-    List<WritableComparable<?>> data = collector.getValue("canopy");
+    assertEquals("Number of map results", 1, collector.getData().size());
+    List<MeanShiftCanopy> data = collector.getValue("canopy");
     assertEquals("Number of canopies", refCanopies.size(), data.size());
+
     // add all points to the reference canopies
     Map<String, MeanShiftCanopy> refCanopyMap = new HashMap<String, MeanShiftCanopy>();
     for (MeanShiftCanopy canopy : refCanopies) {
@@ -226,20 +210,19 @@
     }
     // build a map of the combiner output
     Map<String, MeanShiftCanopy> canopyMap = new HashMap<String, MeanShiftCanopy>();
-    for (WritableComparable<?> d : data) {
-      MeanShiftCanopy dc = MeanShiftCanopy.decodeCanopy(d.toString());
-      canopyMap.put(dc.getIdentifier(), dc);
+    for (MeanShiftCanopy d : data) {
+      canopyMap.put(d.getIdentifier(), d);
     }
     // compare the maps
-    for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap.entrySet()) {
+    for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap
+        .entrySet()) {
       MeanShiftCanopy ref = stringMeanShiftCanopyEntry.getValue();
 
       MeanShiftCanopy canopy = canopyMap.get((ref.isConverged() ? "V" : "C")
           + (ref.getCanopyId() - raw.length));
       assertEquals("ids", ref.getCanopyId(), canopy.getCanopyId() + 100);
       assertEquals("centers(" + ref.getIdentifier() + ')', ref.getCenter()
-          .asFormatString(), canopy.getCenter()
-          .asFormatString());
+          .asFormatString(), canopy.getCenter().asFormatString());
       assertEquals("bound points", ref.getBoundPoints().size(), canopy
           .getBoundPoints().size());
     }
@@ -254,68 +237,63 @@
    */
   public void testCanopyReducerEuclidean() throws Exception {
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
-    MeanShiftCanopyCombiner combiner = new MeanShiftCanopyCombiner();
     MeanShiftCanopyReducer reducer = new MeanShiftCanopyReducer();
-    DummyOutputCollector<Text,WritableComparable<?>> collector = new DummyOutputCollector<Text,WritableComparable<?>>();
+    DummyOutputCollector<Text, MeanShiftCanopy> mapCollector = new DummyOutputCollector<Text, MeanShiftCanopy>();
     MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
-    // build the reference set
-    List<MeanShiftCanopy> refCanopies = new ArrayList<MeanShiftCanopy>();
+    // build the mapper output reference set
+    List<MeanShiftCanopy> mapperReference = new ArrayList<MeanShiftCanopy>();
     for (Vector aRaw : raw) {
-      MeanShiftCanopy.mergeCanopy(new MeanShiftCanopy(aRaw), refCanopies);
+      MeanShiftCanopy.mergeCanopy(new MeanShiftCanopy(aRaw), mapperReference);
     }
-    List<MeanShiftCanopy> refCanopies2 = new ArrayList<MeanShiftCanopy>();
-    for (MeanShiftCanopy canopy : refCanopies) {
+    for (MeanShiftCanopy canopy : mapperReference) {
       canopy.shiftToMean();
     }
-    for (MeanShiftCanopy canopy : refCanopies) {
-      MeanShiftCanopy.mergeCanopy(canopy, refCanopies2);
+    // build the reducer reference output set
+    List<MeanShiftCanopy> reducerReference = new ArrayList<MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : mapperReference) {
+      MeanShiftCanopy.mergeCanopy(canopy, reducerReference);
     }
-    for (MeanShiftCanopy canopy : refCanopies) {
+    for (MeanShiftCanopy canopy : reducerReference) {
       canopy.shiftToMean();
     }
 
     // map the data
-    for (MeanShiftCanopy canopy : canopies) {
-      mapper.map(new Text(), new Text(canopy.toString()), collector, null);
-    }
-    assertEquals("Number of map results", 100, collector.getData().size());
-    // now combine the mapper output
-    MeanShiftCanopy.config(euclideanDistanceMeasure, 4, 1, 0.5);
-    Map<String, List<WritableComparable<?>>> mapData = collector.getData();
-    collector = new DummyOutputCollector<Text,WritableComparable<?>>();
-    for (Map.Entry<String, List<WritableComparable<?>>> stringListEntry : mapData.entrySet())
-      combiner.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
-          null);
-    // now reduce the combiner output
-    DummyOutputCollector<Text,WritableComparable<?>> collector2 = new DummyOutputCollector<Text,WritableComparable<?>>();
-    reducer.reduce(new Text("canopy"), collector.getValue("canopy").iterator(),
-        collector2, null);
+    for (MeanShiftCanopy canopy : canopies)
+      mapper.map(new Text(), canopy, mapCollector, null);
+    mapper.close();
+
+    assertEquals("Number of map results", 1, mapCollector.getData().size());
+    // now reduce the mapper output
+    DummyOutputCollector<Text, MeanShiftCanopy> reduceCollector = new DummyOutputCollector<Text, MeanShiftCanopy>();
+    reducer.reduce(new Text("canopy"), mapCollector.getValue("canopy").iterator(),
+        reduceCollector, null);
+    reducer.close();
 
     // now verify the output
-    assertEquals("Number of canopies", refCanopies2.size(), collector2
+    assertEquals("Number of canopies", reducerReference.size(), reduceCollector
         .getKeys().size());
-    // add all points to the reference canopies
-    Map<String, MeanShiftCanopy> refCanopyMap = new HashMap<String, MeanShiftCanopy>();
-    for (MeanShiftCanopy canopy : refCanopies2) {
-      refCanopyMap.put(canopy.getIdentifier(), canopy);
+    
+    // add all points to the reference canopy maps
+    Map<String, MeanShiftCanopy> reducerReferenceMap = new HashMap<String, MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : reducerReference) {
+      reducerReferenceMap.put(canopy.getIdentifier(), canopy);
     }
     // compare the maps
-    for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap.entrySet()) {
-      MeanShiftCanopy ref = stringMeanShiftCanopyEntry.getValue();
-
-      List<WritableComparable<?>> values = collector2
-          .getValue((ref.isConverged() ? "V" : "C")
-              + (ref.getCanopyId() - raw.length));
+    for (Map.Entry<String, MeanShiftCanopy> mapEntry : reducerReferenceMap
+        .entrySet()) {
+      MeanShiftCanopy refCanopy = mapEntry.getValue();
+
+      List<MeanShiftCanopy> values = reduceCollector
+          .getValue((refCanopy.isConverged() ? "V" : "C")
+              + (refCanopy.getCanopyId() - raw.length));
       assertEquals("values", 1, values.size());
-      MeanShiftCanopy canopy = MeanShiftCanopy.decodeCanopy(values.get(0)
-          .toString());
-      assertEquals("ids", ref.getCanopyId(), canopy.getCanopyId() + 100);
-      assertEquals("centers(" + stringMeanShiftCanopyEntry.getKey() + ')', ref.getCenter()
-          .asFormatString(), canopy.getCenter()
-          .asFormatString().toString());
-      assertEquals("bound points", ref.getBoundPoints().size(), canopy
+      MeanShiftCanopy reducerCanopy = values.get(0);
+      assertEquals("ids", refCanopy.getCanopyId(), reducerCanopy.getCanopyId() + 100);
+      assertEquals("centers(" + mapEntry.getKey() + ')', refCanopy
+          .getCenter().asFormatString(), reducerCanopy.getCenter().asFormatString());
+      assertEquals("bound points", refCanopy.getBoundPoints().size(), reducerCanopy
           .getBoundPoints().size());
     }
   }
@@ -330,8 +308,8 @@
     File testData = new File("testdata");
     if (!testData.exists())
       testData.mkdir();
-    writePointsToFileWithPayload(raw, "testdata/file1", "");
-    writePointsToFileWithPayload(raw, "testdata/file2", "");
+    writePointsToFile(raw, "testdata/file1");
+    writePointsToFile(raw, "testdata/file2");
     // now run the Job
     MeanShiftCanopyJob.runJob("testdata", "output",
         EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10);
@@ -340,10 +318,9 @@
     FileSystem fs = FileSystem.get(outPart.toUri(), conf);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
     Text key = new Text();
-    Text value = new Text();
+    MeanShiftCanopy value = new MeanShiftCanopy();
     int count = 0;
     while (reader.next(key, value)) {
-      MeanShiftCanopy.decodeCanopy(value.toString());
       count++;
     }
     reader.close();