You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/04/21 22:35:23 UTC

svn commit: r936489 [2/2] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/clustering/ core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/dirichlet/ core/src/main/java/org/apache/mahou...

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Wed Apr 21 20:35:22 2010
@@ -22,10 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -230,7 +232,7 @@ public class TestCanopyCreation extends 
     mapper.close();
     assertEquals("Number of map results", 1, collector.getData().size());
     // now verify the output
-    List<VectorWritable> data = collector.getValue("centroid");
+    List<VectorWritable> data = collector.getValue(new Text("centroid"));
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++) {
       assertEquals("Centroid error", manhattanCentroids.get(i).asFormatString(), data.get(i).get()
@@ -260,7 +262,7 @@ public class TestCanopyCreation extends 
     mapper.close();
     assertEquals("Number of map results", 1, collector.getData().size());
     // now verify the output
-    List<VectorWritable> data = collector.getValue("centroid");
+    List<VectorWritable> data = collector.getValue(new Text("centroid"));
     assertEquals("Number of centroids", 3, data.size());
     for (int i = 0; i < data.size(); i++) {
       assertEquals("Centroid error", euclideanCentroids.get(i).asFormatString(), data.get(i).get()
@@ -285,10 +287,10 @@ public class TestCanopyCreation extends 
     List<VectorWritable> points = getPointsWritable();
     reducer.reduce(new Text("centroid"), points.iterator(), collector, new DummyReporter());
     reducer.close();
-    Set<String> keys = collector.getKeys();
+    Set<Text> keys = collector.getKeys();
     assertEquals("Number of centroids", 3, keys.size());
     int i = 0;
-    for (String key : keys) {
+    for (Text key : keys) {
       List<Canopy> data = collector.getValue(key);
       assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to "
                    + data.get(0).computeCentroid().asFormatString(), manhattanCentroids.get(i), data.get(0)
@@ -314,10 +316,10 @@ public class TestCanopyCreation extends 
     List<VectorWritable> points = getPointsWritable();
     reducer.reduce(new Text("centroid"), points.iterator(), collector, new DummyReporter());
     reducer.close();
-    Set<String> keys = collector.getKeys();
+    Set<Text> keys = collector.getKeys();
     assertEquals("Number of centroids", 3, keys.size());
     int i = 0;
-    for (String key : keys) {
+    for (Text key : keys) {
       List<Canopy> data = collector.getValue(key);
       assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to "
                    + data.get(0).computeCentroid().asFormatString(), euclideanCentroids.get(i), data.get(0)
@@ -408,7 +410,7 @@ public class TestCanopyCreation extends 
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
+    DummyOutputCollector<IntWritable,VectorWritable> collector = new DummyOutputCollector<IntWritable,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : manhattanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
@@ -419,11 +421,11 @@ public class TestCanopyCreation extends 
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String,List<VectorWritable>> data = collector.getData();
+    Map<IntWritable, List<VectorWritable>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      String key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key, canopies);
+    for (Entry<IntWritable, List<VectorWritable>> stringListEntry : data.entrySet()) {
+      IntWritable key = stringListEntry.getKey();
+      Canopy canopy = findCanopy(key.get(), canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
       for (VectorWritable ptDef : pts) {
         assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef.get()));
@@ -431,9 +433,9 @@ public class TestCanopyCreation extends 
     }
   }
   
-  private static Canopy findCanopy(String key, List<Canopy> canopies) {
+  private static Canopy findCanopy(Integer key, List<Canopy> canopies) {
     for (Canopy c : canopies) {
-      if (c.getIdentifier().equals(key)) {
+      if (c.getId() == key) {
         return c;
       }
     }
@@ -451,86 +453,7 @@ public class TestCanopyCreation extends 
     mapper.configure(conf);
     
     List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
-    int nextCanopyId = 0;
-    for (Vector centroid : euclideanCentroids) {
-      canopies.add(new Canopy(centroid, nextCanopyId++));
-    }
-    mapper.config(canopies);
-    List<VectorWritable> points = getPointsWritable();
-    // map the data
-    for (VectorWritable point : points) {
-      mapper.map(new Text(), point, collector, new DummyReporter());
-    }
-    Map<String,List<VectorWritable>> data = collector.getData();
-    assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      String key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key, canopies);
-      List<VectorWritable> pts = stringListEntry.getValue();
-      for (VectorWritable ptDef : pts) {
-        assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef.get()));
-      }
-    }
-  }
-  
-  /** Story: User can cluster a subset of the points using a ClusterReducer and a ManhattanDistanceMeasure. */
-  public void testClusterReducerManhattan() throws Exception {
-    ClusterMapper mapper = new ClusterMapper();
-    JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
-      "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
-    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
-    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
-    mapper.configure(conf);
-    
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
-    int nextCanopyId = 0;
-    for (Vector centroid : manhattanCentroids) {
-      canopies.add(new Canopy(centroid, nextCanopyId++));
-    }
-    mapper.config(canopies);
-    List<VectorWritable> points = getPointsWritable();
-    // map the data
-    for (VectorWritable point : points) {
-      mapper.map(new Text(), point, collector, new DummyReporter());
-    }
-    Map<String,List<VectorWritable>> data = collector.getData();
-    assertEquals("Number of map results", canopies.size(), data.size());
-    
-    // reduce the data
-    Reducer<Text,VectorWritable,Text,VectorWritable> reducer = new IdentityReducer<Text,VectorWritable>();
-    collector = new DummyOutputCollector<Text,VectorWritable>();
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
-        null);
-    }
-    
-    // check the output
-    data = collector.getData();
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      String key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key, canopies);
-      List<VectorWritable> pts = stringListEntry.getValue();
-      for (VectorWritable ptDef : pts) {
-        assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef.get()));
-      }
-    }
-  }
-  
-  /** Story: User can cluster a subset of the points using a ClusterReducer and a EuclideanDistanceMeasure. */
-  public void testClusterReducerEuclidean() throws Exception {
-    ClusterMapper mapper = new ClusterMapper();
-    JobConf conf = new JobConf();
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
-      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
-    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
-    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
-    mapper.configure(conf);
-    
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    DummyOutputCollector<Text,VectorWritable> collector = new DummyOutputCollector<Text,VectorWritable>();
+    DummyOutputCollector<IntWritable,VectorWritable> collector = new DummyOutputCollector<IntWritable,VectorWritable>();
     int nextCanopyId = 0;
     for (Vector centroid : euclideanCentroids) {
       canopies.add(new Canopy(centroid, nextCanopyId++));
@@ -541,22 +464,11 @@ public class TestCanopyCreation extends 
     for (VectorWritable point : points) {
       mapper.map(new Text(), point, collector, new DummyReporter());
     }
-    Map<String,List<VectorWritable>> data = collector.getData();
-    
-    // reduce the data
-    Reducer<Text,VectorWritable,Text,VectorWritable> reducer = new IdentityReducer<Text,VectorWritable>();
-    collector = new DummyOutputCollector<Text,VectorWritable>();
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      reducer.reduce(new Text(stringListEntry.getKey()), stringListEntry.getValue().iterator(), collector,
-        null);
-    }
-    
-    // check the output
-    data = collector.getData();
+    Map<IntWritable,List<VectorWritable>> data = collector.getData();
     assertEquals("Number of map results", canopies.size(), data.size());
-    for (Map.Entry<String,List<VectorWritable>> stringListEntry : data.entrySet()) {
-      String key = stringListEntry.getKey();
-      Canopy canopy = findCanopy(key, canopies);
+    for (Entry<IntWritable, List<VectorWritable>> stringListEntry : data.entrySet()) {
+      IntWritable key = stringListEntry.getKey();
+      Canopy canopy = findCanopy(key.get(), canopies);
       List<VectorWritable> pts = stringListEntry.getValue();
       for (VectorWritable ptDef : pts) {
         assertTrue("Point not in canopy", mapper.canopyCovers(canopy, ptDef.get()));
@@ -587,14 +499,14 @@ public class TestCanopyCreation extends 
     /*
      * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
      */
-    Text txt = new Text();
+    IntWritable clusterId = new IntWritable(0);
     VectorWritable vector = new VectorWritable();
-    while (reader.next(txt, vector)) {
+    while (reader.next(clusterId, vector)) {
       count++;
-      System.out.println("Txt: " + txt + " Vec: " + vector.get().asFormatString());
+      System.out.println("Txt: " + clusterId + " Vec: " + vector.get().asFormatString());
     }
     // the point [3.0,3.0] is covered by both canopies
-    assertEquals("number of points", 2 + 2 * points.size(), count);
+    assertEquals("number of points", 1 + points.size(), count);
     reader.close();
   }
   
@@ -619,16 +531,16 @@ public class TestCanopyCreation extends 
     /*
      * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
      */
-    Text txt = new Text();
+    IntWritable canopyId = new IntWritable(0);
     VectorWritable can = new VectorWritable();
-    while (reader.next(txt, can)) {
+    while (reader.next(canopyId, can)) {
       count++;
     }
     /*
      * while (reader.ready()) { System.out.println(reader.readLine()); count++; }
      */
     // the point [3.0,3.0] is covered by both canopies
-    assertEquals("number of points", 2 + 2 * points.size(), count);
+    assertEquals("number of points", 1 + points.size(), count);
     reader.close();
   }
   

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/dirichlet/TestMapReduce.java Wed Apr 21 20:35:22 2010
@@ -151,7 +151,7 @@ public class TestMapReduce extends Mahou
     DirichletReducer reducer = new DirichletReducer();
     reducer.configure(state);
     OutputCollector<Text,DirichletCluster<VectorWritable>> reduceCollector = new DummyOutputCollector<Text,DirichletCluster<VectorWritable>>();
-    for (String key : mapCollector.getKeys()) {
+    for (Text key : mapCollector.getKeys()) {
       reducer.reduce(new Text(key), mapCollector.getValue(key).iterator(), reduceCollector, null);
     }
     
@@ -196,7 +196,7 @@ public class TestMapReduce extends Mahou
       DirichletReducer reducer = new DirichletReducer();
       reducer.configure(state);
       OutputCollector<Text,DirichletCluster<VectorWritable>> reduceCollector = new DummyOutputCollector<Text,DirichletCluster<VectorWritable>>();
-      for (String key : mapCollector.getKeys()) {
+      for (Text key : mapCollector.getKeys()) {
         reducer.reduce(new Text(key), mapCollector.getValue(key).iterator(), reduceCollector, null);
       }
       
@@ -359,7 +359,7 @@ public class TestMapReduce extends Mahou
   
   public void testNormalModelWritableSerialization() throws Exception {
     double[] m = {1.1, 2.2, 3.3};
-    Model<?> model = new NormalModel(new DenseVector(m), 3.3);
+    Model<?> model = new NormalModel(5, new DenseVector(m), 3.3);
     DataOutputBuffer out = new DataOutputBuffer();
     model.write(out);
     Model<?> model2 = new NormalModel();
@@ -367,11 +367,12 @@ public class TestMapReduce extends Mahou
     in.reset(out.getData(), out.getLength());
     model2.readFields(in);
     assertEquals("models", model.toString(), model2.toString());
+    assertEquals("ids", 5, model.getId());
   }
   
   public void testSampledNormalModelWritableSerialization() throws Exception {
     double[] m = {1.1, 2.2, 3.3};
-    Model<?> model = new SampledNormalModel(new DenseVector(m), 3.3);
+    Model<?> model = new SampledNormalModel(5, new DenseVector(m), 3.3);
     DataOutputBuffer out = new DataOutputBuffer();
     model.write(out);
     Model<?> model2 = new SampledNormalModel();
@@ -379,12 +380,13 @@ public class TestMapReduce extends Mahou
     in.reset(out.getData(), out.getLength());
     model2.readFields(in);
     assertEquals("models", model.toString(), model2.toString());
+    assertEquals("ids", 5, model.getId());
   }
   
   public void testAsymmetricSampledNormalModelWritableSerialization() throws Exception {
     double[] m = {1.1, 2.2, 3.3};
     double[] s = {3.3, 4.4, 5.5};
-    Model<?> model = new AsymmetricSampledNormalModel(new DenseVector(m), new DenseVector(s));
+    Model<?> model = new AsymmetricSampledNormalModel(5, new DenseVector(m), new DenseVector(s));
     DataOutputBuffer out = new DataOutputBuffer();
     model.write(out);
     Model<?> model2 = new AsymmetricSampledNormalModel();
@@ -392,11 +394,12 @@ public class TestMapReduce extends Mahou
     in.reset(out.getData(), out.getLength());
     model2.readFields(in);
     assertEquals("models", model.toString(), model2.toString());
+    assertEquals("ids", 5, model.getId());
   }
   
   public void testClusterWritableSerialization() throws Exception {
     double[] m = {1.1, 2.2, 3.3};
-    DirichletCluster<?> cluster = new DirichletCluster(new NormalModel(new DenseVector(m), 4), 10);
+    DirichletCluster<?> cluster = new DirichletCluster(new NormalModel(5, new DenseVector(m), 4), 10);
     DataOutputBuffer out = new DataOutputBuffer();
     cluster.write(out);
     DirichletCluster<?> cluster2 = new DirichletCluster();
@@ -406,6 +409,7 @@ public class TestMapReduce extends Mahou
     assertEquals("count", cluster.getTotalCount(), cluster2.getTotalCount());
     assertNotNull("model null", cluster2.getModel());
     assertEquals("model", cluster.getModel().toString(), cluster2.getModel().toString());
+    assertEquals("ids", 5, cluster.getId());
   }
   
 }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Wed Apr 21 20:35:22 2010
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -209,26 +210,11 @@ public class TestFuzzyKmeansClustering e
       outDir.list();
       // assertEquals("output dir files?", 4, outFiles.length);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
-      Text key = new Text();
-      FuzzyKMeansOutput out = new FuzzyKMeansOutput();
+      IntWritable key = new IntWritable();
+      VectorWritable out = new VectorWritable();
       while (reader.next(key, out)) {
-        /*
-         * String line = reader.readLine(); String[] lineParts = line.split("\t"); assertEquals("line parts",
-         * 2, lineParts.length); String clusterInfoStr = lineParts[1].replace("[", "").replace("]", "");
-         * 
-         * String[] clusterInfoList = clusterInfoStr.split(" "); assertEquals("Number of clusters", k + 1,
-         * clusterInfoList.length);
-         */
-        double prob = 0.0;
-        double[] probabilities = out.getProbabilities();
-        for (double probability : probabilities) {
-          // SoftCluster cluster = clusters[i];
-          prob += probability;
-        }
-        prob = round(prob, 1);
-        assertEquals("Sum of cluster Membership probability should be equal to=", 1.0, prob);
-      }
-      
+        // make sure we can read all the clusters
+      }     
       reader.close();
       
     }
@@ -272,7 +258,7 @@ public class TestFuzzyKmeansClustering e
       
       Map<Vector,Double> pointTotalProbMap = new HashMap<Vector,Double>();
       
-      for (String key : mapCollector.getKeys()) {
+      for (Text key : mapCollector.getKeys()) {
         // SoftCluster cluster = SoftCluster.decodeCluster(key);
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         
@@ -334,7 +320,7 @@ public class TestFuzzyKmeansClustering e
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
-      for (String key : mapCollector.getKeys()) {
+      for (Text key : mapCollector.getKeys()) {
         
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
@@ -343,7 +329,7 @@ public class TestFuzzyKmeansClustering e
       // now verify the combiner output
       assertEquals("Combiner Output", k + 1, combinerCollector.getData().size());
       
-      for (String key : combinerCollector.getKeys()) {
+      for (Text key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         assertEquals("too many values", 1, values.size());
       }
@@ -387,7 +373,7 @@ public class TestFuzzyKmeansClustering e
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
-      for (String key : mapCollector.getKeys()) {
+      for (Text key : mapCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
       }
@@ -398,7 +384,7 @@ public class TestFuzzyKmeansClustering e
       reducer.config(clusterList);
       reducer.configure(conf);
       
-      for (String key : combinerCollector.getKeys()) {
+      for (Text key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         reducer.reduce(new Text(key), values.iterator(), reducerCollector, new DummyReporter());
       }
@@ -423,7 +409,7 @@ public class TestFuzzyKmeansClustering e
       
       for (SoftCluster key : reference) {
         String clusterId = key.getIdentifier();
-        List<SoftCluster> values = reducerCollector.getValue(clusterId);
+        List<SoftCluster> values = reducerCollector.getValue(new Text(clusterId));
         SoftCluster cluster = values.get(0);
         System.out.println("ref= " + key.toString() + " cluster= " + cluster.toString());
         cluster.recomputeCenter();
@@ -472,7 +458,7 @@ public class TestFuzzyKmeansClustering e
       FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
       combiner.configure(conf);
       
-      for (String key : mapCollector.getKeys()) {
+      for (Text key : mapCollector.getKeys()) {
         
         List<FuzzyKMeansInfo> values = mapCollector.getValue(key);
         combiner.reduce(new Text(key), values.iterator(), combinerCollector, null);
@@ -484,7 +470,7 @@ public class TestFuzzyKmeansClustering e
       reducer.config(clusterList);
       reducer.configure(conf);
       
-      for (String key : combinerCollector.getKeys()) {
+      for (Text key : combinerCollector.getKeys()) {
         List<FuzzyKMeansInfo> values = combinerCollector.getValue(key);
         reducer.reduce(new Text(key), values.iterator(), reducerCollector, null);
       }
@@ -492,7 +478,7 @@ public class TestFuzzyKmeansClustering e
       // run clusterMapper
       List<SoftCluster> reducerCluster = new ArrayList<SoftCluster>();
       
-      for (String key : reducerCollector.getKeys()) {
+      for (Text key : reducerCollector.getKeys()) {
         List<SoftCluster> values = reducerCollector.getValue(key);
         reducerCluster.add(values.get(0));
       }
@@ -500,7 +486,7 @@ public class TestFuzzyKmeansClustering e
         softCluster.recomputeCenter();
       }
       
-      DummyOutputCollector<Text,FuzzyKMeansOutput> clusterMapperCollector = new DummyOutputCollector<Text,FuzzyKMeansOutput>();
+      DummyOutputCollector<IntWritable,VectorWritable> clusterMapperCollector = new DummyOutputCollector<IntWritable,VectorWritable>();
       
       FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
       clusterMapper.config(reducerCluster);
@@ -530,10 +516,10 @@ public class TestFuzzyKmeansClustering e
           new EuclideanDistanceMeasure(), 0.001, 2), pointClusterInfo);
       
       // Now compare the clustermapper results with reducer
-      for (String key : clusterMapperCollector.getKeys()) {
-        List<FuzzyKMeansOutput> value = clusterMapperCollector.getValue(key);
+      for (IntWritable key : clusterMapperCollector.getKeys()) {
+        List<VectorWritable> value = clusterMapperCollector.getValue(key);
         
-        String refValue = pointClusterInfo.get(key);
+        String refValue = pointClusterInfo.get(key.toString());
         String clusterInfoStr = refValue.substring(1, refValue.length() - 1);
         String[] refClusterInfoList = clusterInfoStr.split(" ");
         assertEquals("Number of clusters", k + 1, refClusterInfoList.length);
@@ -544,25 +530,8 @@ public class TestFuzzyKmeansClustering e
           refClusterInfoMap.put(clusterProb[0], clusterProbVal);
         }
         
-        FuzzyKMeansOutput kMeansOutput = value.get(0);
-        SoftCluster[] softClusters = kMeansOutput.getClusters();
-        double[] probabilities = kMeansOutput.getProbabilities();
-        assertEquals("Number of clusters", k + 1, softClusters.length);
-        for (String clusterInfo : refClusterInfoList) {
-          String[] clusterProb = clusterInfo.split(":");
-          double clusterProbVal = Double.parseDouble(clusterProb[1]);
-          System.out.println(k + " point:" + key + ": Cluster: " + clusterProb[0] + " prob: "
-                             + clusterProbVal);
-          /*
-           * assertEquals(, refClusterInfoMap.get(clusterProb[0]), clusterProbVal);
-           */
-        }
-        for (int i = 0; i < softClusters.length; i++) {
-          SoftCluster softCluster = softClusters[i];
-          Double refProb = refClusterInfoMap.get(String.valueOf(softCluster.getId()));
-          assertEquals(k + " point: " + key + ": expected probability: " + refProb + " was: "
-                       + probabilities[i], refProb, probabilities[i]);
-        }
+        VectorWritable kMeansOutput = value.get(0);
+        // TODO: fail("test this output");
       }
     }
   }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Wed Apr 21 20:35:22 2010
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -177,8 +178,8 @@ public class TestKmeansClustering extend
       // now verify that all points are correctly allocated
       EuclideanDistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
       Map<String,Cluster> clusterMap = loadClusterMap(clusters);
-      for (String key : collector.getKeys()) {
-        Cluster cluster = clusterMap.get(key);
+      for (Text key : collector.getKeys()) {
+        Cluster cluster = clusterMap.get(key.toString());
         List<KMeansInfo> values = collector.getValue(key);
         for (KMeansInfo value : values) {
           double distance = euclideanDistanceMeasure.distance(cluster.getCenter(), value.getPointTotal());
@@ -224,7 +225,7 @@ public class TestKmeansClustering extend
       // now combine the data
       KMeansCombiner combiner = new KMeansCombiner();
       DummyOutputCollector<Text,KMeansInfo> collector2 = new DummyOutputCollector<Text,KMeansInfo>();
-      for (String key : collector.getKeys()) {
+      for (Text key : collector.getKeys()) {
         combiner.reduce(new Text(key), collector.getValue(key).iterator(), collector2, null);
       }
       
@@ -232,7 +233,7 @@ public class TestKmeansClustering extend
       // now verify that all points are accounted for
       int count = 0;
       Vector total = new DenseVector(2);
-      for (String key : collector2.getKeys()) {
+      for (Text key : collector2.getKeys()) {
         List<KMeansInfo> values = collector2.getValue(key);
         assertEquals("too many values", 1, values.size());
         // String value = values.get(0).toString();
@@ -281,7 +282,7 @@ public class TestKmeansClustering extend
       // now combine the data
       KMeansCombiner combiner = new KMeansCombiner();
       DummyOutputCollector<Text,KMeansInfo> collector2 = new DummyOutputCollector<Text,KMeansInfo>();
-      for (String key : collector.getKeys()) {
+      for (Text key : collector.getKeys()) {
         combiner.reduce(new Text(key), collector.getValue(key).iterator(), collector2, null);
       }
       
@@ -290,7 +291,7 @@ public class TestKmeansClustering extend
       reducer.configure(conf);
       reducer.config(clusters);
       DummyOutputCollector<Text,Cluster> collector3 = new DummyOutputCollector<Text,Cluster>();
-      for (String key : collector2.getKeys()) {
+      for (Text key : collector2.getKeys()) {
         reducer.reduce(new Text(key), collector2.getValue(key).iterator(), collector3, new DummyReporter());
       }
       
@@ -319,7 +320,7 @@ public class TestKmeansClustering extend
       for (int i = 0; i < reference.size(); i++) {
         Cluster ref = reference.get(i);
         String key = ref.getIdentifier();
-        List<Cluster> values = collector3.getValue(key);
+        List<Cluster> values = collector3.getValue(new Text(key));
         Cluster cluster = values.get(0);
         converged = converged && cluster.isConverged();
         // Since we aren't roundtripping through Writable, we need to compare the reference center with the
@@ -377,21 +378,15 @@ public class TestKmeansClustering extend
       // assertEquals("output dir files?", 4, outFiles.length);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
       int[] expect = expectedNumPoints[k];
-      DummyOutputCollector<Text,Text> collector = new DummyOutputCollector<Text,Text>();
-      // The key is the name of the vector, or the vector itself
-      Text key = new Text();
-      // The value is the cluster id
-      Text value = new Text();
-      while (reader.next(key, value)) {
-        /*
-         * String line = reader.readLine(); String[] lineParts = line.split("\t"); assertEquals("line parts",
-         * 2, lineParts.length);
-         */
-        // String cl = line.substring(0, line.indexOf(':'));
-        // collector.collect(new Text(lineParts[1]), new Text(lineParts[0]));
-        collector.collect(value, key);
-        key = new Text();
-        value = new Text();
+      DummyOutputCollector<IntWritable,VectorWritable> collector = new DummyOutputCollector<IntWritable,VectorWritable>();
+      // The key is the clusterId
+      IntWritable clusterId = new IntWritable(0);
+      // The value is the vector
+      VectorWritable value = new VectorWritable();
+      while (reader.next(clusterId, value)) {
+       collector.collect(clusterId, value);
+        clusterId = new IntWritable(0);
+        value = new VectorWritable();
         
       }
       reader.close();
@@ -433,22 +428,22 @@ public class TestKmeansClustering extend
     assertTrue("output dir exists?", outDir.exists());
     String[] outFiles = outDir.list();
     assertEquals("output dir files?", 4, outFiles.length);
-    DummyOutputCollector<Text,Text> collector = new DummyOutputCollector<Text,Text>();
+    DummyOutputCollector<IntWritable,VectorWritable> collector = new DummyOutputCollector<IntWritable,VectorWritable>();
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/points/part-00000"), conf);
     
-    // The key is the name of the vector, or the vector itself
-    Text key = new Text();
-    // The value is the cluster id
-    Text value = new Text();
-    while (reader.next(key, value)) {
-      collector.collect(value, key);
-      key = new Text();
-      value = new Text();
+    // The key is the clusterId
+    IntWritable clusterId = new IntWritable(0);
+    // The value is the vector
+    VectorWritable value = new VectorWritable();
+    while (reader.next(clusterId, value)) {
+      collector.collect(clusterId, value);
+      clusterId = new IntWritable(0);
+      value = new VectorWritable();
       
     }
     reader.close();
     
-    assertEquals("num points[0]", 4, collector.getValue("0").size());
-    assertEquals("num points[1]", 5, collector.getValue("1").size());
+    assertEquals("num points[0]", 4, collector.getValue(new IntWritable(0)).size());
+    assertEquals("num points[1]", 5, collector.getValue(new IntWritable(1)).size());
   }
 }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java Wed Apr 21 20:35:22 2010
@@ -40,15 +40,15 @@ import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public class TestMeanShift extends MahoutTestCase {
-  
+
   private Vector[] raw = null;
 
   private Configuration conf;
-  
+
   // DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
-  
+
   private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
-  
+
   /**
    * Print the canopies to the transcript
    * 
@@ -60,7 +60,7 @@ public class TestMeanShift extends Mahou
       System.out.println(canopy.toString());
     }
   }
-  
+
   /** Print a graphical representation of the clustered image points as a 10x10 character mask */
   private void printImage(List<MeanShiftCanopy> canopies) {
     char[][] out = new char[10][10];
@@ -80,7 +80,7 @@ public class TestMeanShift extends Mahou
       System.out.println(anOut);
     }
   }
-  
+
   private static void rmr(String path) throws Exception {
     File f = new File(path);
     if (f.exists()) {
@@ -93,7 +93,7 @@ public class TestMeanShift extends Mahou
       f.delete();
     }
   }
-  
+
   private List<MeanShiftCanopy> getInitialCanopies() {
     int nextCanopyId = 0;
     List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
@@ -102,7 +102,7 @@ public class TestMeanShift extends Mahou
     }
     return canopies;
   }
-  
+
   @Override
   protected void setUp() throws Exception {
     super.setUp();
@@ -125,14 +125,13 @@ public class TestMeanShift extends Mahou
       }
     }
   }
-  
+
   /**
    * Story: User can exercise the reference implementation to verify that the test datapoints are clustered in
    * a reasonable manner.
    */
   public void testReferenceImplementation() {
-    MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(new EuclideanDistanceMeasure(), 4.0,
-        1.0, 0.5);
+    MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(new EuclideanDistanceMeasure(), 4.0, 1.0, 0.5);
     List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
     // add all points to the canopies
     int nextCanopyId = 0;
@@ -154,14 +153,14 @@ public class TestMeanShift extends Mahou
       System.out.println(iter++);
     }
   }
-  
+
   /**
    * Story: User can produce initial canopy centers using a EuclideanDistanceMeasure and a
    * CanopyMapper/Combiner which clusters input points to produce an output set of canopies.
    */
   public void testCanopyMapperEuclidean() throws Exception {
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
-    DummyOutputCollector<Text,MeanShiftCanopy> collector = new DummyOutputCollector<Text,MeanShiftCanopy>();
+    DummyOutputCollector<Text, MeanShiftCanopy> collector = new DummyOutputCollector<Text, MeanShiftCanopy>();
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(euclideanDistanceMeasure, 4, 1, 0.5);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
@@ -171,49 +170,47 @@ public class TestMeanShift extends Mahou
     for (Vector aRaw : raw) {
       clusterer.mergeCanopy(new MeanShiftCanopy(aRaw, nextCanopyId++), refCanopies);
     }
-    
+
     JobConf conf = new JobConf();
-    conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
-      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(MeanShiftCanopyConfigKeys.T1_KEY, "4");
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
     conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
     mapper.configure(conf);
-    
+
     // map the data
     for (MeanShiftCanopy canopy : canopies) {
       mapper.map(new Text(), canopy, collector, null);
     }
     mapper.close();
-    
+
     // now verify the output
     assertEquals("Number of map results", 1, collector.getData().size());
-    List<MeanShiftCanopy> data = collector.getValue("canopy");
+    List<MeanShiftCanopy> data = collector.getValue(new Text("canopy"));
     assertEquals("Number of canopies", refCanopies.size(), data.size());
-    
+
     // add all points to the reference canopies
-    Map<String,MeanShiftCanopy> refCanopyMap = new HashMap<String,MeanShiftCanopy>();
+    Map<String, MeanShiftCanopy> refCanopyMap = new HashMap<String, MeanShiftCanopy>();
     for (MeanShiftCanopy canopy : refCanopies) {
       clusterer.shiftToMean(canopy);
       refCanopyMap.put(canopy.getIdentifier(), canopy);
     }
     // build a map of the combiner output
-    Map<String,MeanShiftCanopy> canopyMap = new HashMap<String,MeanShiftCanopy>();
+    Map<String, MeanShiftCanopy> canopyMap = new HashMap<String, MeanShiftCanopy>();
     for (MeanShiftCanopy d : data) {
       canopyMap.put(d.getIdentifier(), d);
     }
     // compare the maps
-    for (Map.Entry<String,MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap.entrySet()) {
+    for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap.entrySet()) {
       MeanShiftCanopy ref = stringMeanShiftCanopyEntry.getValue();
-      
+
       MeanShiftCanopy canopy = canopyMap.get((ref.isConverged() ? "V" : "C") + ref.getCanopyId());
       assertEquals("ids", ref.getCanopyId(), canopy.getCanopyId());
-      assertEquals("centers(" + ref.getIdentifier() + ')', ref.getCenter().asFormatString(), canopy
-          .getCenter().asFormatString());
+      assertEquals("centers(" + ref.getIdentifier() + ')', ref.getCenter().asFormatString(), canopy.getCenter().asFormatString());
       assertEquals("bound points", ref.getBoundPoints().size(), canopy.getBoundPoints().size());
     }
   }
-  
+
   /**
    * Story: User can produce final canopy centers using a EuclideanDistanceMeasure and a CanopyReducer which
    * clusters input centroid points to produce an output set of final canopy centroid points.
@@ -221,7 +218,7 @@ public class TestMeanShift extends Mahou
   public void testCanopyReducerEuclidean() throws Exception {
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
     MeanShiftCanopyReducer reducer = new MeanShiftCanopyReducer();
-    DummyOutputCollector<Text,MeanShiftCanopy> mapCollector = new DummyOutputCollector<Text,MeanShiftCanopy>();
+    DummyOutputCollector<Text, MeanShiftCanopy> mapCollector = new DummyOutputCollector<Text, MeanShiftCanopy>();
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(euclideanDistanceMeasure, 4, 1, 0.5);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
@@ -242,43 +239,41 @@ public class TestMeanShift extends Mahou
     for (MeanShiftCanopy canopy : reducerReference) {
       clusterer.shiftToMean(canopy);
     }
-    
+
     JobConf conf = new JobConf();
-    conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
-      "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
     conf.set(MeanShiftCanopyConfigKeys.T1_KEY, "4");
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
     conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
     mapper.configure(conf);
-    
+
     // map the data
     for (MeanShiftCanopy canopy : canopies) {
       mapper.map(new Text(), canopy, mapCollector, null);
     }
     mapper.close();
-    
+
     assertEquals("Number of map results", 1, mapCollector.getData().size());
     // now reduce the mapper output
-    DummyOutputCollector<Text,MeanShiftCanopy> reduceCollector = new DummyOutputCollector<Text,MeanShiftCanopy>();
+    DummyOutputCollector<Text, MeanShiftCanopy> reduceCollector = new DummyOutputCollector<Text, MeanShiftCanopy>();
     reducer.configure(conf);
-    reducer.reduce(new Text("canopy"), mapCollector.getValue("canopy").iterator(), reduceCollector,
-      new DummyReporter());
+    reducer.reduce(new Text("canopy"), mapCollector.getValue(new Text("canopy")).iterator(), reduceCollector, new DummyReporter());
     reducer.close();
-    
+
     // now verify the output
     assertEquals("Number of canopies", reducerReference.size(), reduceCollector.getKeys().size());
-    
+
     // add all points to the reference canopy maps
-    Map<String,MeanShiftCanopy> reducerReferenceMap = new HashMap<String,MeanShiftCanopy>();
+    Map<String, MeanShiftCanopy> reducerReferenceMap = new HashMap<String, MeanShiftCanopy>();
     for (MeanShiftCanopy canopy : reducerReference) {
       reducerReferenceMap.put(canopy.getIdentifier(), canopy);
     }
     // compare the maps
-    for (Map.Entry<String,MeanShiftCanopy> mapEntry : reducerReferenceMap.entrySet()) {
+    for (Map.Entry<String, MeanShiftCanopy> mapEntry : reducerReferenceMap.entrySet()) {
       MeanShiftCanopy refCanopy = mapEntry.getValue();
-      
-      List<MeanShiftCanopy> values = reduceCollector.getValue((refCanopy.isConverged() ? "V" : "C")
-                                                              + refCanopy.getCanopyId());
+
+      List<MeanShiftCanopy> values = reduceCollector.getValue(new Text((refCanopy.isConverged() ? "V" : "C")
+          + refCanopy.getCanopyId()));
       assertEquals("values", 1, values.size());
       MeanShiftCanopy reducerCanopy = values.get(0);
       assertEquals("ids", refCanopy.getCanopyId(), reducerCanopy.getCanopyId());
@@ -291,7 +286,7 @@ public class TestMeanShift extends Mahou
       assertEquals("bound points", refCanopy.getBoundPoints().size(), reducerCanopy.getBoundPoints().size());
     }
   }
-  
+
   /**
    * Story: User can produce final point clustering using a Hadoop map/reduce job and a
    * EuclideanDistanceMeasure.

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyOutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyOutputCollector.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyOutputCollector.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyOutputCollector.java Wed Apr 21 20:35:22 2010
@@ -29,30 +29,30 @@ import java.util.Set;
 import java.util.TreeMap;
 
 public class DummyOutputCollector<K extends WritableComparable, V extends Writable>
-    implements OutputCollector<K, V> {
+    implements OutputCollector<K,V> {
 
-  private final Map<String, List<V>> data = new TreeMap<String, List<V>>();
+  private final Map<K, List<V>> data = new TreeMap<K,List<V>>();
 
   @Override
-  public void collect(K key, V values)
+  public void collect(K key,V values)
       throws IOException {
-    List<V> points = data.get(key.toString());
+    List<V> points = data.get(key);
     if (points == null) {
       points = new ArrayList<V>();
-      data.put(key.toString(), points);
+      data.put(key, points);
     }
     points.add(values);
   }
 
-  public Map<String, List<V>> getData() {
+  public Map<K,List<V>> getData() {
     return data;
   }
 
-  public List<V> getValue(String key) {
+  public List<V> getValue(K key) {
     return data.get(key);
   }
 
-  public Set<String> getKeys() {
+  public Set<K> getKeys() {
     return data.keySet();
   }
 

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/ga/watchmaker/EvalMapperTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/ga/watchmaker/EvalMapperTest.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/ga/watchmaker/EvalMapperTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/ga/watchmaker/EvalMapperTest.java Wed Apr 21 20:35:22 2010
@@ -60,10 +60,10 @@ public class EvalMapperTest extends Maho
     }
 
     // check that the evaluations are correct
-    Set<String> keys = collector.getKeys();
+    Set<LongWritable> keys = collector.getKeys();
     assertEquals("Number of evaluations", populationSize, keys.size());
-    for (String key : keys) {
-      DummyCandidate candidate = population.get(Integer.parseInt(key));
+    for (LongWritable key : keys) {
+      DummyCandidate candidate = population.get((int) key.get());
       assertEquals("Values for key " + key, 1, collector.getValue(key).size());
       double fitness = collector.getValue(key).get(0).get();
       assertEquals("Evaluation of the candidate " + key, DummyEvaluator

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/NormalScModelDistribution.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/NormalScModelDistribution.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/NormalScModelDistribution.java (original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/NormalScModelDistribution.java Wed Apr 21 20:35:22 2010
@@ -38,7 +38,7 @@ public class NormalScModelDistribution e
       for (int j = 0; j < 60; j++) {
         mean.set(j, UncommonDistributions.rNorm(30, 0.5));
       }
-      result[i] = new NormalModel(mean, 1);
+      result[i] = new NormalModel(i, mean, 1);
     }
     return result;
   }

Modified: lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDMapperTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDMapperTest.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDMapperTest.java (original)
+++ lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDMapperTest.java Wed Apr 21 20:35:22 2010
@@ -84,12 +84,12 @@ public class CDMapperTest extends Mahout
     mapper.map(new LongWritable(0), dl, collector);
 
     // check the evaluations
-    Set<String> keys = collector.getKeys();
+    Set<LongWritable> keys = collector.getKeys();
     assertEquals("Number of evaluations", rules.size(), keys.size());
 
     CDFitness[] expected = { TP, FP, TN, FN };
-    for (String key : keys) {
-      int index = Integer.parseInt(key);
+    for (LongWritable key : keys) {
+      int index = (int) key.get();
       assertEquals("Values for key " + key, 1, collector.getValue(key).size());
       CDFitness eval = collector.getValue(key).get(0);
 

Modified: lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDReducerTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDReducerTest.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDReducerTest.java (original)
+++ lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/hadoop/CDReducerTest.java Wed Apr 21 20:35:22 2010
@@ -67,12 +67,12 @@ public class CDReducerTest extends Mahou
     reducer.reduce(zero, evaluations.iterator(), collector, null);
 
     // check if the expectations are met
-    Set<String> keys = collector.getKeys();
+    Set<LongWritable> keys = collector.getKeys();
     assertEquals("nb keys", 1, keys.size());
-    assertTrue("bad key", keys.contains(zero.toString()));
+    assertTrue("bad key", keys.contains(zero));
 
-    assertEquals("nb values", 1, collector.getValue(zero.toString()).size());
-    CDFitness fitness = collector.getValue(zero.toString()).get(0);
+    assertEquals("nb values", 1, collector.getValue(zero).size());
+    CDFitness fitness = collector.getValue(zero).get(0);
     assertEquals(expected, fitness);
 
   }

Modified: lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/tool/ToolMapperTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/tool/ToolMapperTest.java?rev=936489&r1=936488&r2=936489&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/tool/ToolMapperTest.java (original)
+++ lucene/mahout/trunk/examples/src/test/java/org/apache/mahout/ga/watchmaker/cd/tool/ToolMapperTest.java Wed Apr 21 20:35:22 2010
@@ -42,7 +42,7 @@ public class ToolMapperTest extends Maho
     mapper.map(key, value, output, null);
 
     for (int index = 0; index < 6; index++) {
-      List<Text> values = output.getValue(String.valueOf(index));
+      List<Text> values = output.getValue(new LongWritable(index));
       assertEquals("should extract one value per attribute", 1, values.size());
       assertEquals("Bad extracted value", "A" + (index + 1), values.get(0)
           .toString());
@@ -65,7 +65,7 @@ public class ToolMapperTest extends Maho
     mapper.map(key, value, output, null);
 
     for (int index = 0; index < 6; index++) {
-      List<Text> values = output.getValue(String.valueOf(index));
+      List<Text> values = output.getValue(new LongWritable(index));
       if (index == 1 || index == 3 || index == 4) {
         // this attribute should be ignored
         assertNull("Attribute (" + index + ") should be ignored", values);

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDistantPointWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDistantPointWritable.java?rev=936489&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDistantPointWritable.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDistantPointWritable.java Wed Apr 21 20:35:22 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.math.VectorWritable;
+
+public class CDbwDistantPointWritable implements Writable {
+
+  /**
+   * @return the distance
+   */
+  public double getDistance() {
+    return distance;
+  }
+
+  /**
+   * @return the point
+   */
+  public VectorWritable getPoint() {
+    return point;
+  }
+
+  public CDbwDistantPointWritable(double distance, VectorWritable point) {
+    super();
+    this.distance = distance;
+    this.point = point;
+  }
+
+  public CDbwDistantPointWritable() {
+    super();
+  }
+
+  private double distance;
+
+  private VectorWritable point;
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    distance = in.readDouble();
+    point = new VectorWritable();
+    point.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(distance);
+    point.write(out);
+  }
+
+  public String toString() {
+    return String.valueOf(distance) + ": " + (point == null ? "null" : ClusterBase.formatVector(point.get(), null));
+  }
+
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java?rev=936489&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwDriver.java Wed Apr 21 20:35:22 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.dirichlet.DirichletCluster;
+import org.apache.mahout.clustering.dirichlet.DirichletMapper;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CDbwDriver {
+
+  public static final String STATE_IN_KEY = "org.apache.mahout.clustering.dirichlet.stateIn";
+
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.dirichlet.modelFactory";
+
+  public static final String NUM_CLUSTERS_KEY = "org.apache.mahout.clustering.dirichlet.numClusters";
+
+  private static final Logger log = LoggerFactory.getLogger(CDbwDriver.class);
+
+  private CDbwDriver() {
+  }
+
+  public static void main(String[] args) throws Exception {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+
+    Option inputOpt = DefaultOptionCreator.inputOption().create();
+    Option outputOpt = DefaultOptionCreator.outputOption().create();
+    Option maxIterOpt = DefaultOptionCreator.maxIterOption().create();
+    Option helpOpt = DefaultOptionCreator.helpOption();
+
+    Option modelOpt = obuilder.withLongName("modelClass").withRequired(true).withShortName("d").withArgument(
+        abuilder.withName("modelClass").withMinimum(1).withMaximum(1).create()).withDescription(
+        "The ModelDistribution class name. " + "Defaults to org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution")
+        .create();
+
+    Option numRedOpt = obuilder.withLongName("maxRed").withRequired(true).withShortName("r").withArgument(
+        abuilder.withName("maxRed").withMinimum(1).withMaximum(1).create()).withDescription("The number of reduce tasks.").create();
+
+    Group group = gbuilder.withName("Options").withOption(inputOpt).withOption(outputOpt).withOption(modelOpt).withOption(
+        maxIterOpt).withOption(helpOpt).withOption(numRedOpt).create();
+
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      CommandLine cmdLine = parser.parse(args);
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+
+      String input = cmdLine.getValue(inputOpt).toString();
+      String output = cmdLine.getValue(outputOpt).toString();
+      String modelFactory = "org.apache.mahout.clustering.dirichlet.models.NormalModelDistribution";
+      if (cmdLine.hasOption(modelOpt)) {
+        modelFactory = cmdLine.getValue(modelOpt).toString();
+      }
+      int numReducers = Integer.parseInt(cmdLine.getValue(numRedOpt).toString());
+      int maxIterations = Integer.parseInt(cmdLine.getValue(maxIterOpt).toString());
+      runJob(input, null, output, modelFactory, maxIterations, numReducers);
+    } catch (OptionException e) {
+      log.error("Exception parsing command line: ", e);
+      CommandLineUtil.printHelp(group);
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param clustersIn
+   *          the directory pathname for input [n/a :: Cluster]
+   * @param clusteredPointsIn 
+              the directory pathname for input clustered points [clusterId :: VectorWritable]
+   * @param output
+   *          the directory pathname for output reference points [clusterId :: VectorWritable]
+   * @param distanceMeasureClass
+   *          the String ModelDistribution class name to use
+   * @param numIterations
+   *          the number of iterations
+   * @param numReducers
+   *          the number of Reducers desired
+   */
+  public static void runJob(String clustersIn, String clusteredPointsIn, String output, String distanceMeasureClass,
+      int numIterations, int numReducers) throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+      IOException, SecurityException, NoSuchMethodException, InvocationTargetException {
+
+    String stateIn = output + "/representativePoints-0";
+    writeInitialState(stateIn, clustersIn);
+
+    for (int iteration = 0; iteration < numIterations; iteration++) {
+      log.info("Iteration {}", iteration);
+      // point the output to a new directory per iteration
+      String stateOut = output + "/representativePoints-" + (iteration + 1);
+      runIteration(clusteredPointsIn, stateIn, stateOut, distanceMeasureClass, numReducers);
+      // now point the input to the old output directory
+      stateIn = stateOut;
+    }
+  }
+
+  private static void writeInitialState(String output, String clustersIn) throws ClassNotFoundException, InstantiationException,
+      IllegalAccessException, IOException, SecurityException, NoSuchMethodException, InvocationTargetException {
+
+    JobConf job = new JobConf(KMeansDriver.class);
+    Path outPath = new Path(output);
+    FileSystem fs = FileSystem.get(outPath.toUri(), job);
+    File f = new File(clustersIn);
+    for (File part : f.listFiles()) {
+      if (!part.getName().startsWith(".")) {
+        Path inPart = new Path(clustersIn + "/" + part.getName());
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inPart, job);
+        Writable key = (Writable) reader.getKeyClass().newInstance();
+        Writable value = (Writable) reader.getValueClass().newInstance();
+        Path path = new Path(output + "/" + part.getName());
+        SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, IntWritable.class, VectorWritable.class);
+        while (reader.next(key, value)) {
+          Cluster cluster = (Cluster) value;
+          if (!(cluster instanceof DirichletCluster) || ((DirichletCluster) cluster).getTotalCount() > 0) {
+            System.out.println("C-" + cluster.getId() + ": " + ClusterBase.formatVector(cluster.getCenter(), null));
+            writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter()));
+          }
+        }
+        writer.close();
+      }
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input
+   *          the directory pathname for input points
+   * @param stateIn
+   *          the directory pathname for input state
+   * @param stateOut
+   *          the directory pathname for output state
+   * @param distanceMeasureClass
+   *          the class name of the DistanceMeasure class
+   * @param numReducers
+   *          the number of Reducers desired
+   */
+  public static void runIteration(String input, String stateIn, String stateOut, String distanceMeasureClass, int numReducers) {
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(CDbwDriver.class);
+
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(CDbwDistantPointWritable.class);
+
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(stateOut);
+    FileOutputFormat.setOutputPath(conf, outPath);
+
+    conf.setMapperClass(CDbwMapper.class);
+    conf.setReducerClass(CDbwReducer.class);
+    conf.setNumReduceTasks(numReducers);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.set(STATE_IN_KEY, stateIn);
+    conf.set(DISTANCE_MEASURE_KEY, distanceMeasureClass);
+
+    client.setConf(conf);
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      log.warn(e.toString(), e);
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input
+   *          the directory pathname for input points
+   * @param stateIn
+   *          the directory pathname for input state
+   * @param output
+   *          the directory pathname for output points
+   */
+  public static void runClustering(String input, String stateIn, String output) {
+    Configurable client = new JobClient();
+    JobConf conf = new JobConf(CDbwDriver.class);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outPath);
+
+    conf.setMapperClass(DirichletMapper.class);
+    conf.setNumReduceTasks(0);
+
+    client.setConf(conf);
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      log.warn(e.toString(), e);
+    }
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java?rev=936489&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwMapper.java Wed Apr 21 20:35:22 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.VectorWritable;
+
+public class CDbwMapper extends MapReduceBase implements Mapper<IntWritable, VectorWritable, IntWritable, CDbwDistantPointWritable> {
+
+  private Map<Integer, List<VectorWritable>> representativePoints;
+
+  private Map<Integer, CDbwDistantPointWritable> mostDistantPoints = new HashMap<Integer, CDbwDistantPointWritable>();
+
+  private DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+  private OutputCollector<IntWritable, CDbwDistantPointWritable> output = null;
+
+  @Override
+  public void map(IntWritable clusterId, VectorWritable point, OutputCollector<IntWritable, CDbwDistantPointWritable> output,
+      Reporter reporter) throws IOException {
+
+      this.output = output;
+
+    int key = clusterId.get();
+    CDbwDistantPointWritable currentMDP = mostDistantPoints.get(key);
+
+    List<VectorWritable> refPoints = representativePoints.get(key);
+    double totalDistance = 0.0;
+    for (VectorWritable refPoint : refPoints) {
+      totalDistance += measure.distance(refPoint.get(), point.get());
+    }
+    if (currentMDP == null || currentMDP.getDistance() < totalDistance) {
+      mostDistantPoints.put(key, new CDbwDistantPointWritable(totalDistance, new VectorWritable(point.get().clone())));
+    }
+  }
+
+  public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) {
+    this.representativePoints = referencePoints;
+    this.measure = measure;
+  }
+
+  public static Map<Integer, List<VectorWritable>> getReferencePoints(JobConf job) throws SecurityException,
+      IllegalArgumentException, NoSuchMethodException, InvocationTargetException {
+    String statePath = job.get(CDbwDriver.STATE_IN_KEY);
+    Map<Integer, List<VectorWritable>> representativePoints = new HashMap<Integer, List<VectorWritable>>();
+    try {
+      Path path = new Path(statePath);
+      FileSystem fs = FileSystem.get(path.toUri(), job);
+      FileStatus[] status = fs.listStatus(path, new OutputLogFilter());
+      for (FileStatus s : status) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), job);
+        try {
+          IntWritable key = new IntWritable(0);
+          VectorWritable point = new VectorWritable();
+          while (reader.next(key, point)) {
+            List<VectorWritable> repPoints = representativePoints.get(key.get());
+            if (repPoints == null) {
+              repPoints = new ArrayList<VectorWritable>();
+              representativePoints.put(key.get(), repPoints);
+            }
+            repPoints.add(point);
+            point = new VectorWritable();          }
+        } finally {
+          reader.close();
+        }
+      }
+      return representativePoints;
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl = ccl.loadClass(job.get(CDbwDriver.DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      representativePoints = getReferencePoints(job);
+    } catch (NumberFormatException e) {
+      throw new IllegalStateException(e);
+    } catch (SecurityException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalStateException(e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalStateException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   */
+  @Override
+  public void close() throws IOException {
+    for (Integer clusterId : mostDistantPoints.keySet()) {
+      output.collect(new IntWritable(clusterId), mostDistantPoints.get(clusterId));
+    }
+    super.close();
+  }
+}

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java?rev=936489&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/clustering/cdbw/CDbwReducer.java Wed Apr 21 20:35:22 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VectorWritable;
+
+public class CDbwReducer extends MapReduceBase implements
+    Reducer<IntWritable, CDbwDistantPointWritable, IntWritable, VectorWritable> {
+
+  private Map<Integer, List<VectorWritable>> referencePoints;
+
+  private OutputCollector<IntWritable, VectorWritable> output;
+
+  @Override
+  public void reduce(IntWritable key, Iterator<CDbwDistantPointWritable> values,
+      OutputCollector<IntWritable, VectorWritable> output, Reporter reporter) throws IOException {
+    this.output = output;
+    // find the most distant point
+    CDbwDistantPointWritable mdp = null;
+    while (values.hasNext()) {
+      CDbwDistantPointWritable dpw = values.next();
+      if (mdp == null || mdp.getDistance() < dpw.getDistance()) {
+        mdp = new CDbwDistantPointWritable(dpw.getDistance(), dpw.getPoint());
+      }
+    }
+    output.collect(new IntWritable(key.get()), mdp.getPoint());
+  }
+
+  public void configure(Map<Integer, List<VectorWritable>> referencePoints) {
+    this.referencePoints = referencePoints;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   */
+  @Override
+  public void close() throws IOException {
+    for (Integer clusterId : referencePoints.keySet()) {
+      for (VectorWritable vw : referencePoints.get(clusterId)) {
+        output.collect(new IntWritable(clusterId), vw);
+      }
+    }
+    super.close();
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      referencePoints = CDbwMapper.getReferencePoints(job);
+    } catch (NumberFormatException e) {
+      throw new IllegalStateException(e);
+    } catch (SecurityException e) {
+      throw new IllegalStateException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalStateException(e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+}

Added: lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java?rev=936489&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java (added)
+++ lucene/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java Wed Apr 21 20:35:22 2010
@@ -0,0 +1,122 @@
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.clustering.ClusterBase;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyClusteringJob;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.dirichlet.DirichletDriver;
+import org.apache.mahout.clustering.dirichlet.models.L1ModelDistribution;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.clustering.meanshift.MeanShiftCanopyJob;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class TestCDbwEvaluator extends MahoutTestCase {
+
+  public static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 },
+      { 5, 5 } };
+
+  private List<VectorWritable> sampleData;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    RandomUtils.useTestSeed();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    // Create testdata directory
+    ClusteringTestUtils.rmr("testdata");
+    File f = new File("testdata");
+    f.mkdir();
+    ClusteringTestUtils.rmr("output");
+    // Create test data
+    sampleData = TestKmeansClustering.getPointsWritable(reference);
+    ClusteringTestUtils.writePointsToFile(sampleData, "testdata/file1", fs, conf);
+  }
+
+  private void checkRefPoints(int numIterations) throws IOException {
+    File out = new File("output");
+    assertTrue("output is not Dir", out.isDirectory());
+    for (int i = 0; i <= numIterations; i++) {
+      out = new File("output/representativePoints-" + i);
+      assertTrue("rep-i is not a Dir", out.isDirectory());
+      System.out.println(out.getName() + ":");
+      File[] files = out.listFiles();
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      for (File file : files) {
+        if (!file.getName().startsWith(".")) {
+          SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(file.getAbsolutePath()), conf);
+          try {
+            IntWritable clusterId = new IntWritable(0);
+            VectorWritable point = new VectorWritable();
+            while (reader.next(clusterId, point)) {
+              System.out.println("\tC-" + clusterId + ": " + ClusterBase.formatVector(point.get(), null));
+            }
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+  }
+
+  public void testCanopy() throws Exception { // now run the Job
+    CanopyClusteringJob.runJob("testdata", "output", EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
+    int numIterations = 2;
+    CDbwDriver.runJob("output/canopies", "output/clusters", "output", EuclideanDistanceMeasure.class.getName(), numIterations, 1);
+    checkRefPoints(numIterations);
+  }
+
+  public void testKmeans() throws Exception {
+    // now run the Canopy job to prime kMeans canopies
+    CanopyDriver.runJob("testdata", "output/canopies", EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
+    // now run the KMeans job
+    KMeansDriver.runJob("testdata", "output/canopies", "output", EuclideanDistanceMeasure.class.getName(), 0.001, 10, 1);
+    int numIterations = 2;
+    CDbwDriver.runJob("output/clusters-1", "output/points", "output", EuclideanDistanceMeasure.class.getName(), numIterations, 1);
+    checkRefPoints(numIterations);
+  }
+
+  public void testFuzzyKmeans() throws Exception {
+    // now run the Canopy job to prime kMeans canopies
+    CanopyDriver.runJob("testdata", "output/canopies", EuclideanDistanceMeasure.class.getName(), 3.1, 2.1);
+    // now run the KMeans job
+    FuzzyKMeansDriver.runJob("testdata", "output/canopies", "output", EuclideanDistanceMeasure.class.getName(), 0.001, 10, 1, 1, 2);
+    int numIterations = 2;
+    CDbwDriver.runJob("output/clusters-3", "output/points", "output", EuclideanDistanceMeasure.class.getName(), numIterations, 1);
+    checkRefPoints(numIterations);
+  }
+
+  public void testMeanShift() throws Exception {
+    MeanShiftCanopyJob.runJob("testdata", "output", EuclideanDistanceMeasure.class.getName(), 2.1, 1.0, 0.001, 10);
+    int numIterations = 2;
+    CDbwDriver.runJob("output/canopies-1", "output/clusters", "output", EuclideanDistanceMeasure.class.getName(), numIterations, 1);
+    checkRefPoints(numIterations);
+  }
+
+  public void testDirichlet() throws Exception {
+    Vector prototype = new DenseVector(2);
+    DirichletDriver.runJob("testdata", "output", L1ModelDistribution.class.getName(), prototype.getClass().getName(), prototype
+        .size(), 15, 5, 1.0, 1);
+    int numIterations = 2;
+    CDbwDriver.runJob("output/state-5", "output/clusters", "output", EuclideanDistanceMeasure.class.getName(), numIterations, 1);
+    checkRefPoints(numIterations);
+  }
+
+}