You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:36 UTC

[05/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
new file mode 100644
index 0000000..35de87e
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+@Deprecated
+public final class TestCanopyCreation extends MahoutTestCase {
+
+  private static final double[][] RAW = { { 1, 1 }, { 2, 1 }, { 1, 2 },
+      { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
+
+  private List<Canopy> referenceManhattan;
+
+  private final DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+
+  private List<Vector> manhattanCentroids;
+
+  private List<Canopy> referenceEuclidean;
+
+  private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
+  private List<Vector> euclideanCentroids;
+
+  private FileSystem fs;
+
+  private static List<VectorWritable> getPointsWritable() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  private static List<Vector> getPoints() {
+    List<Vector> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(vec);
+    }
+    return points;
+  }
+
+  /**
+   * Print the canopies to the transcript
+   * 
+   * @param canopies
+   *          a List<Canopy>
+   */
+  private static void printCanopies(Iterable<Canopy> canopies) {
+    for (Canopy canopy : canopies) {
+      System.out.println(canopy.asFormatString(null));
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(getConfiguration());
+    referenceManhattan = CanopyClusterer.createCanopies(getPoints(),
+        manhattanDistanceMeasure, 3.1, 2.1);
+    manhattanCentroids = CanopyClusterer.getCenters(referenceManhattan);
+    referenceEuclidean = CanopyClusterer.createCanopies(getPoints(),
+        euclideanDistanceMeasure, 3.1, 2.1);
+    euclideanCentroids = CanopyClusterer.getCenters(referenceEuclidean);
+  }
+
+  /**
+   * Story: User can cluster points using a ManhattanDistanceMeasure and a
+   * reference implementation
+   */
+  @Test
+  public void testReferenceManhattan() throws Exception {
+    // see setUp for cluster creation
+    printCanopies(referenceManhattan);
+    assertEquals("number of canopies", 3, referenceManhattan.size());
+    for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+      Canopy testCanopy = referenceManhattan.get(canopyIx);
+      int[] expectedNumPoints = { 4, 4, 3 };
+      double[][] expectedCentroids = { { 1.5, 1.5 }, { 4.0, 4.0 },
+          { 4.666666666666667, 4.6666666666666667 } };
+      assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(),
+                   expectedNumPoints[canopyIx]);
+      double[] refCentroid = expectedCentroids[canopyIx];
+      Vector testCentroid = testCanopy.computeCentroid();
+      for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
+            refCentroid[pointIx], testCentroid.get(pointIx), EPSILON);
+      }
+    }
+  }
+
+  /**
+   * Story: User can cluster points using a EuclideanDistanceMeasure and a
+   * reference implementation
+   */
+  @Test
+  public void testReferenceEuclidean() throws Exception {
+    // see setUp for cluster creation
+    printCanopies(referenceEuclidean);
+    assertEquals("number of canopies", 3, referenceEuclidean.size());
+    int[] expectedNumPoints = { 5, 5, 3 };
+    double[][] expectedCentroids = { { 1.8, 1.8 }, { 4.2, 4.2 },
+        { 4.666666666666667, 4.666666666666667 } };
+    for (int canopyIx = 0; canopyIx < referenceEuclidean.size(); canopyIx++) {
+      Canopy testCanopy = referenceEuclidean.get(canopyIx);
+      assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(),
+                   expectedNumPoints[canopyIx]);
+      double[] refCentroid = expectedCentroids[canopyIx];
+      Vector testCentroid = testCanopy.computeCentroid();
+      for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+        assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
+            refCentroid[pointIx], testCentroid.get(pointIx), EPSILON);
+      }
+    }
+  }
+
+  /**
+   * Story: User can produce initial canopy centers using a
+   * ManhattanDistanceMeasure and a CanopyMapper which clusters input points to
+   * produce an output set of canopy centroid points.
+   */
+  @Test
+  public void testCanopyMapperManhattan() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure
+        .getClass().getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
+    DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+        .build(mapper, conf, writer);
+    mapper.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    // map the data
+    for (VectorWritable point : points) {
+      mapper.map(new Text(), point, context);
+    }
+    mapper.cleanup(context);
+    assertEquals("Number of map results", 1, writer.getData().size());
+    // now verify the output
+    List<VectorWritable> data = writer.getValue(new Text("centroid"));
+    assertEquals("Number of centroids", 3, data.size());
+    for (int i = 0; i < data.size(); i++) {
+      assertEquals("Centroid error",
+          manhattanCentroids.get(i).asFormatString(), data.get(i).get()
+              .asFormatString());
+    }
+  }
+
+  /**
+   * Story: User can produce initial canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+   * points to produce an output set of canopy centroid points.
+   */
+  @Test
+  public void testCanopyMapperEuclidean() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, euclideanDistanceMeasure
+        .getClass().getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
+    DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+        .build(mapper, conf, writer);
+    mapper.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    // map the data
+    for (VectorWritable point : points) {
+      mapper.map(new Text(), point, context);
+    }
+    mapper.cleanup(context);
+    assertEquals("Number of map results", 1, writer.getData().size());
+    // now verify the output
+    List<VectorWritable> data = writer.getValue(new Text("centroid"));
+    assertEquals("Number of centroids", 3, data.size());
+    for (int i = 0; i < data.size(); i++) {
+      assertEquals("Centroid error",
+          euclideanCentroids.get(i).asFormatString(), data.get(i).get()
+              .asFormatString());
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a
+   * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid
+   * points to produce an output set of final canopy centroid points.
+   */
+  @Test
+  public void testCanopyReducerManhattan() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
+    DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+    Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+        .build(reducer, conf, writer, Text.class, VectorWritable.class);
+    reducer.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    reducer.reduce(new Text("centroid"), points, context);
+    Iterable<Text> keys = writer.getKeysInInsertionOrder();
+    assertEquals("Number of centroids", 3, Iterables.size(keys));
+    int i = 0;
+    for (Text key : keys) {
+      List<ClusterWritable> data = writer.getValue(key);
+      ClusterWritable clusterWritable = data.get(0);
+      Canopy canopy = (Canopy) clusterWritable.getValue();
+      assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to "
+          + canopy.computeCentroid().asFormatString(),
+          manhattanCentroids.get(i), canopy.computeCentroid());
+      i++;
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a
+   * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
+   * points to produce an output set of final canopy centroid points.
+   */
+  @Test
+  public void testCanopyReducerEuclidean() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
+    DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+    Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context =
+        DummyRecordWriter.build(reducer, conf, writer, Text.class, VectorWritable.class);
+    reducer.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    reducer.reduce(new Text("centroid"), points, context);
+    Iterable<Text> keys = writer.getKeysInInsertionOrder();
+    assertEquals("Number of centroids", 3, Iterables.size(keys));
+    int i = 0;
+    for (Text key : keys) {
+      List<ClusterWritable> data = writer.getValue(key);
+      ClusterWritable clusterWritable = data.get(0);
+      Canopy canopy = (Canopy) clusterWritable.getValue();
+      assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to "
+          + canopy.computeCentroid().asFormatString(),
+          euclideanCentroids.get(i), canopy.computeCentroid());
+      i++;
+    }
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job
+   * and a ManhattanDistanceMeasure.
+   */
+  @Test
+  public void testCanopyGenManhattanMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration config = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, config);
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file2"), fs, config);
+    // now run the Canopy Driver
+    Path output = getTestTempDirPath("output");
+    CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+        manhattanDistanceMeasure, 3.1, 2.1, false, 0.0, false);
+
+    // verify output from sequence file
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
+    FileSystem fs = FileSystem.get(path.toUri(), config);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
+    try {
+      Writable key = new Text();
+      ClusterWritable clusterWritable = new ClusterWritable();
+      assertTrue("more to come", reader.next(key, clusterWritable));
+      assertEquals("1st key", "C-0", key.toString());
+
+      List<Pair<Double,Double>> refCenters = Lists.newArrayList();
+      refCenters.add(new Pair<Double,Double>(1.5,1.5));
+      refCenters.add(new Pair<Double,Double>(4.333333333333334,4.333333333333334));
+      Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue() .getCenter().get(0),
+      clusterWritable.getValue().getCenter().get(1));
+      assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+      assertTrue("more to come", reader.next(key, clusterWritable));
+      assertEquals("2nd key", "C-1", key.toString());
+      c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+          clusterWritable.getValue().getCenter().get(1));
+      assertTrue("center " + c + " not found", findAndRemove(c, refCenters, EPSILON));
+      assertFalse("more to come", reader.next(key, clusterWritable));
+    } finally {
+      Closeables.close(reader, true);
+    }
+  }
+
+  static boolean findAndRemove(Pair<Double, Double> target, Collection<Pair<Double, Double>> list, double epsilon) {
+    for (Pair<Double,Double> curr : list) {
+      if ( (Math.abs(target.getFirst() - curr.getFirst()) < epsilon) 
+           && (Math.abs(target.getSecond() - curr.getSecond()) < epsilon) ) {
+        list.remove(curr);
+        return true;
+      } 
+    }
+    return false;
+  }
+
+  /**
+   * Story: User can produce final canopy centers using a Hadoop map/reduce job
+   * and a EuclideanDistanceMeasure.
+   */
+  @Test
+  public void testCanopyGenEuclideanMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration config = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, config);
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file2"), fs, config);
+    // now run the Canopy Driver
+    Path output = getTestTempDirPath("output");
+    CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+        euclideanDistanceMeasure, 3.1, 2.1, false, 0.0, false);
+
+    // verify output from sequence file
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
+    FileSystem fs = FileSystem.get(path.toUri(), config);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
+    try {
+      Writable key = new Text();
+      ClusterWritable clusterWritable = new ClusterWritable();
+      assertTrue("more to come", reader.next(key, clusterWritable));
+      assertEquals("1st key", "C-0", key.toString());
+
+      List<Pair<Double,Double>> refCenters = Lists.newArrayList();
+      refCenters.add(new Pair<Double,Double>(1.8,1.8));
+      refCenters.add(new Pair<Double,Double>(4.433333333333334, 4.433333333333334));
+      Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+                                                      clusterWritable.getValue().getCenter().get(1));
+      assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+      assertTrue("more to come", reader.next(key, clusterWritable));
+      assertEquals("2nd key", "C-1", key.toString());
+      c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+                                  clusterWritable.getValue().getCenter().get(1));
+      assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+      assertFalse("more to come", reader.next(key, clusterWritable));
+    } finally {
+      Closeables.close(reader, true);
+    }
+  }
+
+  /** Story: User can cluster points using sequential execution */
+  @Test
+  public void testClusteringManhattanSeq() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration config = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, config);
+    // now run the Canopy Driver in sequential mode
+    Path output = getTestTempDirPath("output");
+    CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+        manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, true);
+
+    // verify output from sequence file
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
+    int ix = 0;
+    for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+        config)) {
+      assertEquals("Center [" + ix + ']', manhattanCentroids.get(ix), clusterWritable.getValue()
+          .getCenter());
+      ix++;
+    }
+
+    path = new Path(output, "clusteredPoints/part-m-0");
+    long count = HadoopUtil.countRecords(path, config);
+    assertEquals("number of points", points.size(), count);
+  }
+
+  /** Story: User can cluster points using sequential execution */
+  @Test
+  public void testClusteringEuclideanSeq() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration config = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, config);
+    // now run the Canopy Driver in sequential mode
+    Path output = getTestTempDirPath("output");
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+        optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+        optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+        optKey(DefaultOptionCreator.METHOD_OPTION),
+        DefaultOptionCreator.SEQUENTIAL_METHOD };
+    ToolRunner.run(config, new CanopyDriver(), args);
+
+    // verify output from sequence file
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
+
+    int ix = 0;
+    for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+        config)) {
+      assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue()
+          .getCenter());
+      ix++;
+    }
+
+    path = new Path(output, "clusteredPoints/part-m-0");
+    long count = HadoopUtil.countRecords(path, config);
+    assertEquals("number of points", points.size(), count);
+  }
+  
+  /** Story: User can remove outliers while clustering points using sequential execution */
+  @Test
+  public void testClusteringEuclideanWithOutlierRemovalSeq() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration config = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, config);
+    // now run the Canopy Driver in sequential mode
+    Path output = getTestTempDirPath("output");
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+        optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+        optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.5",
+        optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+        optKey(DefaultOptionCreator.METHOD_OPTION),
+        DefaultOptionCreator.SEQUENTIAL_METHOD };
+    ToolRunner.run(config, new CanopyDriver(), args);
+
+    // verify output from sequence file
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
+
+    int ix = 0;
+    for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+        config)) {
+      assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue()
+          .getCenter());
+      ix++;
+    }
+
+    path = new Path(output, "clusteredPoints/part-m-0");
+    long count = HadoopUtil.countRecords(path, config);
+    int expectedPointsHavingPDFGreaterThanThreshold = 6;
+    assertEquals("number of points", expectedPointsHavingPDFGreaterThanThreshold, count);
+  }
+
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a ManhattanDistanceMeasure.
+   */
+  @Test
+  public void testClusteringManhattanMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file2"), fs, conf);
+    // now run the Job
+    Path output = getTestTempDirPath("output");
+    CanopyDriver.run(conf, getTestTempDirPath("testdata"), output,
+        manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, false);
+    Path path = new Path(output, "clusteredPoints/part-m-00000");
+    long count = HadoopUtil.countRecords(path, conf);
+    assertEquals("number of points", points.size(), count);
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure.
+   */
+  @Test
+  public void testClusteringEuclideanMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file2"), fs, conf);
+    // now run the Job using the run() command. Others can use runJob().
+    Path output = getTestTempDirPath("output");
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+        optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+        optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+    ToolRunner.run(getConfiguration(), new CanopyDriver(), args);
+    Path path = new Path(output, "clusteredPoints/part-m-00000");
+    long count = HadoopUtil.countRecords(path, conf);
+    assertEquals("number of points", points.size(), count);
+  }
+  
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure and outlier removal threshold.
+   */
+  @Test
+  public void testClusteringEuclideanWithOutlierRemovalMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable();
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, 
+        getTestTempFilePath("testdata/file2"), fs, conf);
+    // now run the Job using the run() command. Others can use runJob().
+    Path output = getTestTempDirPath("output");
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+        optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+        optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.7",
+        optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+    ToolRunner.run(getConfiguration(), new CanopyDriver(), args);
+    Path path = new Path(output, "clusteredPoints/part-m-00000");
+    long count = HadoopUtil.countRecords(path, conf);
+    int expectedPointsAfterOutlierRemoval = 8;
+    assertEquals("number of points", expectedPointsAfterOutlierRemoval, count);
+  }
+
+
+  /**
+   * Story: User can set T3 and T4 values to be used by the reducer for its T1
+   * and T2 thresholds
+   */
+  @Test
+  public void testCanopyReducerT3T4Configuration() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(1.1));
+    conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(0.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "0");
+    DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+    Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+        .build(reducer, conf, writer, Text.class, VectorWritable.class);
+    reducer.setup(context);
+    assertEquals(1.1, reducer.getCanopyClusterer().getT1(), EPSILON);
+    assertEquals(0.1, reducer.getCanopyClusterer().getT2(), EPSILON);
+  }
+
+  /**
+   * Story: User can specify a clustering limit that prevents output of small
+   * clusters
+   */
+  @Test
+  public void testCanopyMapperClusterFilter() throws Exception {
+    CanopyMapper mapper = new CanopyMapper();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure
+        .getClass().getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "3");
+    DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+        .build(mapper, conf, writer);
+    mapper.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    // map the data
+    for (VectorWritable point : points) {
+      mapper.map(new Text(), point, context);
+    }
+    mapper.cleanup(context);
+    assertEquals("Number of map results", 1, writer.getData().size());
+    // now verify the output
+    List<VectorWritable> data = writer.getValue(new Text("centroid"));
+    assertEquals("Number of centroids", 2, data.size());
+  }
+
+  /**
+   * Story: User can specify a cluster filter that limits the minimum size of
+   * canopies produced by the reducer
+   */
+  @Test
+  public void testCanopyReducerClusterFilter() throws Exception {
+    CanopyReducer reducer = new CanopyReducer();
+    Configuration conf = getConfiguration();
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+        "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+    conf.set(CanopyConfigKeys.CF_KEY, "3");
+    DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+    Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+        .build(reducer, conf, writer, Text.class, VectorWritable.class);
+    reducer.setup(context);
+
+    List<VectorWritable> points = getPointsWritable();
+    reducer.reduce(new Text("centroid"), points, context);
+    Set<Text> keys = writer.getKeys();
+    assertEquals("Number of centroids", 2, keys.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
new file mode 100644
index 0000000..cbf0e55
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ClusterClassificationDriverTest extends MahoutTestCase {
+
+  private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4},
+      {5, 4}, {4, 5}, {5, 5}, {9, 9}, {8, 8}};
+
+  private FileSystem fs;
+  private Path clusteringOutputPath;
+  private Configuration conf;
+  private Path pointsPath;
+  private Path classifiedOutputPath;
+  private List<Vector> firstCluster;
+  private List<Vector> secondCluster;
+  private List<Vector> thirdCluster;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+    firstCluster = Lists.newArrayList();
+    secondCluster = Lists.newArrayList();
+    thirdCluster = Lists.newArrayList();
+
+  }
+
+  private static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Test
+  public void testVectorClassificationWithOutlierRemovalMR() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+    pointsPath = getTestTempDirPath("points");
+    clusteringOutputPath = getTestTempDirPath("output");
+    classifiedOutputPath = getTestTempDirPath("classifiedClusters");
+    HadoopUtil.delete(conf, classifiedOutputPath);
+
+    conf = getConfiguration();
+
+    ClusteringTestUtils.writePointsToFile(points, true,
+        new Path(pointsPath, "file1"), fs, conf);
+    runClustering(pointsPath, conf, false);
+    runClassificationWithOutlierRemoval(false);
+    collectVectorsForAssertion();
+    assertVectorsWithOutlierRemoval();
+  }
+
+  @Test
+  public void testVectorClassificationWithoutOutlierRemoval() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+    pointsPath = getTestTempDirPath("points");
+    clusteringOutputPath = getTestTempDirPath("output");
+    classifiedOutputPath = getTestTempDirPath("classify");
+
+    conf = getConfiguration();
+
+    ClusteringTestUtils.writePointsToFile(points,
+        new Path(pointsPath, "file1"), fs, conf);
+    runClustering(pointsPath, conf, true);
+    runClassificationWithoutOutlierRemoval();
+    collectVectorsForAssertion();
+    assertVectorsWithoutOutlierRemoval();
+  }
+
+  @Test
+  public void testVectorClassificationWithOutlierRemoval() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+    pointsPath = getTestTempDirPath("points");
+    clusteringOutputPath = getTestTempDirPath("output");
+    classifiedOutputPath = getTestTempDirPath("classify");
+
+    conf = getConfiguration();
+
+    ClusteringTestUtils.writePointsToFile(points,
+        new Path(pointsPath, "file1"), fs, conf);
+    runClustering(pointsPath, conf, true);
+    runClassificationWithOutlierRemoval(true);
+    collectVectorsForAssertion();
+    assertVectorsWithOutlierRemoval();
+  }
+
+  private void runClustering(Path pointsPath, Configuration conf,
+      Boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    CanopyDriver.run(conf, pointsPath, clusteringOutputPath,
+        new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, runSequential);
+    Path finalClustersPath = new Path(clusteringOutputPath, "clusters-0-final");
+    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(),
+        finalClustersPath);
+  }
+
+  private void runClassificationWithoutOutlierRemoval()
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.0, true, true);
+  }
+
+  private void runClassificationWithOutlierRemoval(boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.73, true, runSequential);
+  }
+
+  private void collectVectorsForAssertion() throws IOException {
+    Path[] partFilePaths = FileUtil.stat2Paths(fs
+        .globStatus(classifiedOutputPath));
+    FileStatus[] listStatus = fs.listStatus(partFilePaths,
+        PathFilters.partFilter());
+    for (FileStatus partFile : listStatus) {
+      SequenceFile.Reader classifiedVectors = new SequenceFile.Reader(fs,
+          partFile.getPath(), conf);
+      Writable clusterIdAsKey = new IntWritable();
+      WeightedPropertyVectorWritable point = new WeightedPropertyVectorWritable();
+      while (classifiedVectors.next(clusterIdAsKey, point)) {
+        collectVector(clusterIdAsKey.toString(), point.getVector());
+      }
+    }
+  }
+
+  private void collectVector(String clusterId, Vector vector) {
+    if ("0".equals(clusterId)) {
+      firstCluster.add(vector);
+    } else if ("1".equals(clusterId)) {
+      secondCluster.add(vector);
+    } else if ("2".equals(clusterId)) {
+      thirdCluster.add(vector);
+    }
+  }
+
+  private void assertVectorsWithOutlierRemoval() {
+    checkClustersWithOutlierRemoval();
+  }
+
+  private void assertVectorsWithoutOutlierRemoval() {
+    assertFirstClusterWithoutOutlierRemoval();
+    assertSecondClusterWithoutOutlierRemoval();
+    assertThirdClusterWithoutOutlierRemoval();
+  }
+
+  private void assertThirdClusterWithoutOutlierRemoval() {
+    Assert.assertEquals(2, thirdCluster.size());
+    for (Vector vector : thirdCluster) {
+      Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:9.0,1:9.0}",
+          "{0:8.0,1:8.0}"}, vector.asFormatString()));
+    }
+  }
+
+  private void assertSecondClusterWithoutOutlierRemoval() {
+    Assert.assertEquals(4, secondCluster.size());
+    for (Vector vector : secondCluster) {
+      Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:4.0,1:4.0}",
+          "{0:5.0,1:4.0}", "{0:4.0,1:5.0}", "{0:5.0,1:5.0}"},
+          vector.asFormatString()));
+    }
+  }
+
+  private void assertFirstClusterWithoutOutlierRemoval() {
+    Assert.assertEquals(3, firstCluster.size());
+    for (Vector vector : firstCluster) {
+      Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:1.0,1:1.0}",
+          "{0:2.0,1:1.0}", "{0:1.0,1:2.0}"}, vector.asFormatString()));
+    }
+  }
+
+  private void checkClustersWithOutlierRemoval() {
+    Set<String> reference = Sets.newHashSet("{0:9.0,1:9.0}", "{0:1.0,1:1.0}");
+
+    List<List<Vector>> clusters = Lists.newArrayList();
+    clusters.add(firstCluster);
+    clusters.add(secondCluster);
+    clusters.add(thirdCluster);
+
+    int singletonCnt = 0;
+    int emptyCnt = 0;
+    for (List<Vector> vList : clusters) {
+      if (vList.isEmpty()) {
+        emptyCnt++;
+      } else {
+        singletonCnt++;
+        assertEquals("expecting only singleton clusters; got size=" + vList.size(), 1, vList.size());
+        if (vList.get(0).getClass().equals(NamedVector.class)) {
+          Assert.assertTrue("not expecting cluster:" + ((NamedVector) vList.get(0)).getDelegate().asFormatString(),
+                  reference.contains(((NamedVector)  vList.get(0)).getDelegate().asFormatString()));
+          reference.remove(((NamedVector)vList.get(0)).getDelegate().asFormatString());
+        } else if (vList.get(0).getClass().equals(RandomAccessSparseVector.class)) {
+          Assert.assertTrue("not expecting cluster:" + vList.get(0).asFormatString(),
+                  reference.contains(vList.get(0).asFormatString()));
+          reference.remove(vList.get(0).asFormatString());
+        }
+      }
+    }
+    Assert.assertEquals("Different number of empty clusters than expected!", 1, emptyCnt);
+    Assert.assertEquals("Different number of singletons than expected!", 2, singletonCnt);
+    Assert.assertEquals("Didn't match all reference clusters!", 0, reference.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
new file mode 100644
index 0000000..fc71ecf
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Closeables;
+
+public final class TestFuzzyKmeansClustering extends MahoutTestCase {
+
+  private FileSystem fs;
+  private final DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
+  private static Vector tweakValue(Vector point) {
+    return point.plus(0.1);
+  }
+
+  @Test
+  public void testFuzzyKMeansSeqJob() throws Exception {
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersPath, "part-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = tweakValue(points.get(i).get());
+          SoftCluster cluster = new SoftCluster(vec, i, measure);
+          /* add the center so the centroid will be correct upon output */
+          cluster.observe(cluster.getCenter(), 1);
+          // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+
+      // now run the Job using the run() command line options.
+      Path output = getTestTempDirPath("output" + k);
+      /*      FuzzyKMeansDriver.runJob(pointsPath,
+                                     clustersPath,
+                                     output,
+                                     EuclideanDistanceMeasure.class.getName(),
+                                     0.001,
+                                     2,
+                                     k + 1,
+                                     2,
+                                     false,
+                                     true,
+                                     0);
+      */
+      String[] args = {
+          optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+          clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION),
+          output.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+          EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+          "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+          "2",
+          optKey(FuzzyKMeansDriver.M_OPTION),
+          "2.0",
+          optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+          optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD
+      };
+      FuzzyKMeansDriver.main(args);
+      long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf);
+      assertTrue(count > 0);
+    }
+
+  }
+
+  @Test
+  public void testFuzzyKMeansMRJob() throws Exception {
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersPath, "part-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = tweakValue(points.get(i).get());
+
+          SoftCluster cluster = new SoftCluster(vec, i, measure);
+          /* add the center so the centroid will be correct upon output */
+          cluster.observe(cluster.getCenter(), 1);
+          // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+
+      // now run the Job using the run() command line options.
+      Path output = getTestTempDirPath("output" + k);
+      /*      FuzzyKMeansDriver.runJob(pointsPath,
+                                     clustersPath,
+                                     output,
+                                     EuclideanDistanceMeasure.class.getName(),
+                                     0.001,
+                                     2,
+                                     k + 1,
+                                     2,
+                                     false,
+                                     true,
+                                     0);
+      */
+      String[] args = {
+          optKey(DefaultOptionCreator.INPUT_OPTION),
+          pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+          clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION),
+          output.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+          EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+          "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+          "2",
+          optKey(FuzzyKMeansDriver.M_OPTION),
+          "2.0",
+          optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION)
+      };
+      ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args);
+      long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf);
+      assertTrue(count > 0);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
new file mode 100644
index 0000000..fdcfd64
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.CosineDistanceMeasure;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestClusterClassifier extends MahoutTestCase {
+  
+  private static ClusterClassifier newDMClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure));
+    models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+  
+  private static ClusterClassifier newKlusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+  
+  private static ClusterClassifier newCosineKlusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new CosineDistanceMeasure();
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+
+  private static ClusterClassifier newSoftClusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new SoftCluster(new DenseVector(2), 1, measure));
+    models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy());
+  }
+  
+  private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException {
+    Path path = new Path(getTestTempDirPath(), "output");
+    classifier.writeToSeqFiles(path);
+    ClusterClassifier newClassifier = new ClusterClassifier();
+    newClassifier.readFromSeqFiles(getConfiguration(), path);
+    return newClassifier;
+  }
+  
+  @Test
+  public void testDMClusterClassification() {
+    ClusterClassifier classifier = newDMClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testClusterClassification() {
+    ClusterClassifier classifier = newKlusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testSoftClusterClassification() {
+    ClusterClassifier classifier = newSoftClusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testDMClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newDMClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testClusterClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newKlusterClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testSoftClusterClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newSoftClusterClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testClusterIteratorKMeans() {
+    List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+    ClusterClassifier prior = newKlusterClassifier();
+    ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+    assertEquals(3, posterior.getModels().size());
+    for (Cluster cluster : posterior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+  }
+  
+  @Test
+  public void testClusterIteratorDirichlet() {
+    List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+    ClusterClassifier prior = newKlusterClassifier();
+    ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+    assertEquals(3, posterior.getModels().size());
+    for (Cluster cluster : posterior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+  }
+  
+  @Test
+  public void testSeqFileClusterIteratorKMeans() throws IOException {
+    Path pointsPath = getTestTempDirPath("points");
+    Path priorPath = getTestTempDirPath("prior");
+    Path outPath = getTestTempDirPath("output");
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    Path path = new Path(priorPath, "priorClassifier");
+    ClusterClassifier prior = newKlusterClassifier();
+    prior.writeToSeqFiles(path);
+    assertEquals(3, prior.getModels().size());
+    System.out.println("Prior");
+    for (Cluster cluster : prior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+    ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5);
+    
+    for (int i = 1; i <= 4; i++) {
+      System.out.println("Classifier-" + i);
+      ClusterClassifier posterior = new ClusterClassifier();
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
+      assertEquals(3, posterior.getModels().size());
+      for (Cluster cluster : posterior.getModels()) {
+        System.out.println(cluster.asFormatString(null));
+      }
+      
+    }
+  }
+  
+  @Test
+  public void testMRFileClusterIteratorKMeans() throws Exception {
+    Path pointsPath = getTestTempDirPath("points");
+    Path priorPath = getTestTempDirPath("prior");
+    Path outPath = getTestTempDirPath("output");
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    Path path = new Path(priorPath, "priorClassifier");
+    ClusterClassifier prior = newKlusterClassifier();
+    prior.writeToSeqFiles(path);
+    ClusteringPolicy policy = new KMeansClusteringPolicy();
+    ClusterClassifier.writePolicy(policy, path);
+    assertEquals(3, prior.getModels().size());
+    System.out.println("Prior");
+    for (Cluster cluster : prior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+    ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5);
+    
+    for (int i = 1; i <= 4; i++) {
+      System.out.println("Classifier-" + i);
+      ClusterClassifier posterior = new ClusterClassifier();
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
+      assertEquals(3, posterior.getModels().size());
+      for (Cluster cluster : posterior.getModels()) {
+        System.out.println(cluster.asFormatString(null));
+      }     
+    }
+  }
+  
+  @Test
+  public void testCosineKlusterClassification() {
+    ClusterClassifier classifier = newCosineKlusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
new file mode 100644
index 0000000..5666765
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+public final class TestKmeansClustering extends MahoutTestCase {
+  
+  public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4},
+      {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}};
+  
+  private FileSystem fs;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new DenseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  public static List<Vector> getPoints(double[][] raw) {
+    List<Vector> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new SequentialAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(vec);
+    }
+    return points;
+  }
+  
+  /**
+   * Tests
+   * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)}
+   * ) single run convergence with a given distance threshold.
+   */
+  /*@Test
+  public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() {
+    double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}};
+    List<Vector> points = getPoints(rawPoints);
+
+    ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure();
+    List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3),
+        3, distanceMeasure));
+
+    // To converge in a single run, the given distance threshold should be
+    // greater than or equal to 0.125,
+    // since 0.125 will be the distance between center and centroid for the
+    // initial two clusters after one run.
+    double distanceThreshold = 0.25;
+
+    boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold);
+
+    Vector cluster1Center = clusters.get(0).getCenter();
+    assertEquals(0, cluster1Center.get(0), EPSILON);
+    assertEquals(0.125, cluster1Center.get(1), EPSILON);
+
+    Vector cluster2Center = clusters.get(1).getCenter();
+    assertEquals(0, cluster2Center.get(0), EPSILON);
+    assertEquals(0.875, cluster2Center.get(1), EPSILON);
+
+    assertTrue("KMeans iteration should be converged after a single run", converged);
+  }*/
+
+  /** Story: User wishes to run kmeans job on reference data */
+  @Test
+  public void testKMeansSeqJob() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k++) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD};
+      ToolRunner.run(conf, new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-0"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /** Story: User wishes to run kmeans job on reference data (DenseVector test) */
+  @Test
+  public void testKMeansSeqJobDenseVector() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k++) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD};
+      ToolRunner.run(conf, new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-0"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /** Story: User wishes to run kmeans job on reference data */
+  @Test
+  public void testKMeansMRJob() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k += 3) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+      ToolRunner.run(getConfiguration(), new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      // assertEquals("output dir files?", 4, outFiles.length);
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /**
+   * Story: User wants to use canopy clustering to input the initial clusters
+   * for kmeans job.
+   */
+  @Test
+  public void testKMeansWithCanopyClusterInput() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    
+    Path outputPath = getTestTempDirPath("output");
+    // now run the Canopy job
+    CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false);
+    
+    DummyOutputCollector<Text, ClusterWritable> collector1 =
+        new DummyOutputCollector<Text, ClusterWritable>();
+
+    FileStatus[] outParts = FileSystem.get(conf).globStatus(
+                    new Path(outputPath, "clusters-0-final/*-0*"));
+    for (FileStatus outPartStat : outParts) {
+      for (Pair<Text,ClusterWritable> record :
+               new SequenceFileIterable<Text,ClusterWritable>(
+                 outPartStat.getPath(), conf)) {
+          collector1.collect(record.getFirst(), record.getSecond());
+      }
+    }
+
+    boolean got15 = false;
+    boolean got43 = false;
+    int count = 0;
+    for (Text k : collector1.getKeys()) {
+      count++;
+      List<ClusterWritable> vl = collector1.getValue(k);
+      assertEquals("non-singleton centroid!", 1, vl.size());
+      ClusterWritable clusterWritable = vl.get(0);
+      Vector v = clusterWritable.getValue().getCenter();
+      assertEquals("cetriod vector is wrong length", 2, v.size());
+      if ( (Math.abs(v.get(0) - 1.5) < EPSILON) 
+                  && (Math.abs(v.get(1) - 1.5) < EPSILON)
+                  && !got15) {
+        got15 = true;
+      } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON) 
+                  && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON)
+                  && !got43) {
+        got43 = true;
+      } else {
+        fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']');
+      }
+    }
+    assertEquals("got unexpected number of centers", 2, count);
+
+    // now run the KMeans job
+    Path kmeansOutput = new Path(outputPath, "kmeans");
+	  KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput,
+      0.001, 10, true, 0.0, false);
+    
+    // now compare the expected clusters with actual
+    Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints");
+    DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+    
+    // The key is the clusterId, the value is the weighted vector
+    for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+        new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    
+    for (IntWritable k : collector.getKeys()) {
+      List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k);
+      assertTrue("empty cluster!", !wpvList.isEmpty());
+      if (wpvList.get(0).getVector().get(0) <= 2.0) {
+        for (WeightedPropertyVectorWritable wv : wpvList) {
+          Vector v = wv.getVector();
+          int idx = v.maxValueIndex();
+          assertTrue("bad cluster!", v.get(idx) <= 2.0);
+        }
+        assertEquals("Wrong size cluster", 4, wpvList.size());
+      } else {
+        for (WeightedPropertyVectorWritable wv : wpvList) {
+          Vector v = wv.getVector();
+          int idx = v.minValueIndex();
+          assertTrue("bad cluster!", v.get(idx) > 2.0);
+        }
+        assertEquals("Wrong size cluster", 5, wpvList.size());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
new file mode 100644
index 0000000..5cb012a
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
@@ -0,0 +1,169 @@
+   /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestRandomSeedGenerator extends MahoutTestCase {
+  
+  private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
+    {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private FileSystem fs;
+  
+  private static List<VectorWritable> getPoints() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+  
+  /** Story: test random seed generation generates 4 clusters with proper ids and data */
+  @Test
+  public void testRandomSeedGenerator() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure());
+
+    int clusterCount = 0;
+    Collection<Integer> set = Sets.newHashSet();
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+      clusterCount++;
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // Validate unique id's
+      
+      Vector v = cluster.getCenter();
+      assertVectorEquals(RAW[id], v); // Validate values match
+    }
+
+    assertEquals(4, clusterCount); // Validate sample count
+  }
+  
+  /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */
+  @Test
+  public void testRandomSeedGeneratorSeeded() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L);
+
+    int clusterCount = 0;
+    Collection<Integer> set = Sets.newHashSet();
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+      clusterCount++;
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // validate unique id's
+      
+      Vector v = cluster.getCenter();
+      assertVectorEquals(RAW[id], v); // validate values match
+    }
+
+    assertEquals(4, clusterCount); // validate sample count
+  }
+  
+  /** Test that initial clusters built with same random seed are reproduced  */
+ @Test
+ public void testBuildRandomSeededSameInitalClusters() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    long randSeed=1;
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);
+    
+    int[] clusterIDSeq = new int[4];
+    
+    /** run through all clusters once and set sequence of IDs  */  
+    int clusterCount = 0;
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {      
+      Cluster cluster = clusterWritable.getValue();
+      clusterIDSeq[clusterCount] = cluster.getId();
+      clusterCount++; 
+    }
+    
+    /* Rebuild cluster and run through again making sure all IDs are in the same random sequence
+     * Needs a better test because in this case passes when seeded with 1 and 2  fails with 1, 3
+     * passes when set to two */
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);     clusterCount = 0;    
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {       
+      Cluster cluster = clusterWritable.getValue();
+      // Make sure cluster ids are in same random sequence
+      assertEquals(clusterIDSeq[clusterCount], cluster.getId());
+      clusterCount++;
+    }
+ }
+  
+  private static void assertVectorEquals(double[] raw, Vector v) {
+    assertEquals(raw.length, v.size());
+    for (int i = 0; i < raw.length; i++) {
+      assertEquals(raw[i], v.getQuick(i), EPSILON);
+    }
+  }
+}