You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:07 UTC

[05/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
new file mode 100644
index 0000000..fc71ecf
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Closeables;
+
+public final class TestFuzzyKmeansClustering extends MahoutTestCase {
+
+  private FileSystem fs;
+  private final DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
+  private static Vector tweakValue(Vector point) {
+    return point.plus(0.1);
+  }
+
+  @Test
+  public void testFuzzyKMeansSeqJob() throws Exception {
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersPath, "part-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = tweakValue(points.get(i).get());
+          SoftCluster cluster = new SoftCluster(vec, i, measure);
+          /* add the center so the centroid will be correct upon output */
+          cluster.observe(cluster.getCenter(), 1);
+          // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+
+      // now run the Job using the run() command line options.
+      Path output = getTestTempDirPath("output" + k);
+      /*      FuzzyKMeansDriver.runJob(pointsPath,
+                                     clustersPath,
+                                     output,
+                                     EuclideanDistanceMeasure.class.getName(),
+                                     0.001,
+                                     2,
+                                     k + 1,
+                                     2,
+                                     false,
+                                     true,
+                                     0);
+      */
+      String[] args = {
+          optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+          clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION),
+          output.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+          EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+          "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+          "2",
+          optKey(FuzzyKMeansDriver.M_OPTION),
+          "2.0",
+          optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+          optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD
+      };
+      FuzzyKMeansDriver.main(args);
+      long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf);
+      assertTrue(count > 0);
+    }
+
+  }
+
+  @Test
+  public void testFuzzyKMeansMRJob() throws Exception {
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersPath, "part-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = tweakValue(points.get(i).get());
+
+          SoftCluster cluster = new SoftCluster(vec, i, measure);
+          /* add the center so the centroid will be correct upon output */
+          cluster.observe(cluster.getCenter(), 1);
+          // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+
+      // now run the Job using the run() command line options.
+      Path output = getTestTempDirPath("output" + k);
+      /*      FuzzyKMeansDriver.runJob(pointsPath,
+                                     clustersPath,
+                                     output,
+                                     EuclideanDistanceMeasure.class.getName(),
+                                     0.001,
+                                     2,
+                                     k + 1,
+                                     2,
+                                     false,
+                                     true,
+                                     0);
+      */
+      String[] args = {
+          optKey(DefaultOptionCreator.INPUT_OPTION),
+          pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+          clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION),
+          output.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+          EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+          "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+          "2",
+          optKey(FuzzyKMeansDriver.M_OPTION),
+          "2.0",
+          optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION)
+      };
+      ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args);
+      long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf);
+      assertTrue(count > 0);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
new file mode 100644
index 0000000..fdcfd64
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.CosineDistanceMeasure;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestClusterClassifier extends MahoutTestCase {
+  
+  private static ClusterClassifier newDMClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure));
+    models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+  
+  private static ClusterClassifier newKlusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+  
+  private static ClusterClassifier newCosineKlusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new CosineDistanceMeasure();
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+    models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new KMeansClusteringPolicy());
+  }
+
+  private static ClusterClassifier newSoftClusterClassifier() {
+    List<Cluster> models = Lists.newArrayList();
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure));
+    models.add(new SoftCluster(new DenseVector(2), 1, measure));
+    models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure));
+    return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy());
+  }
+  
+  private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException {
+    Path path = new Path(getTestTempDirPath(), "output");
+    classifier.writeToSeqFiles(path);
+    ClusterClassifier newClassifier = new ClusterClassifier();
+    newClassifier.readFromSeqFiles(getConfiguration(), path);
+    return newClassifier;
+  }
+  
+  @Test
+  public void testDMClusterClassification() {
+    ClusterClassifier classifier = newDMClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testClusterClassification() {
+    ClusterClassifier classifier = newKlusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testSoftClusterClassification() {
+    ClusterClassifier classifier = newSoftClusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null));
+  }
+  
+  @Test
+  public void testDMClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newDMClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testClusterClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newKlusterClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testSoftClusterClassifierSerialization() throws Exception {
+    ClusterClassifier classifier = newSoftClusterClassifier();
+    ClusterClassifier classifierOut = writeAndRead(classifier);
+    assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+    assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+        .getName());
+  }
+  
+  @Test
+  public void testClusterIteratorKMeans() {
+    List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+    ClusterClassifier prior = newKlusterClassifier();
+    ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+    assertEquals(3, posterior.getModels().size());
+    for (Cluster cluster : posterior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+  }
+  
+  @Test
+  public void testClusterIteratorDirichlet() {
+    List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+    ClusterClassifier prior = newKlusterClassifier();
+    ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+    assertEquals(3, posterior.getModels().size());
+    for (Cluster cluster : posterior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+  }
+  
+  @Test
+  public void testSeqFileClusterIteratorKMeans() throws IOException {
+    Path pointsPath = getTestTempDirPath("points");
+    Path priorPath = getTestTempDirPath("prior");
+    Path outPath = getTestTempDirPath("output");
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    Path path = new Path(priorPath, "priorClassifier");
+    ClusterClassifier prior = newKlusterClassifier();
+    prior.writeToSeqFiles(path);
+    assertEquals(3, prior.getModels().size());
+    System.out.println("Prior");
+    for (Cluster cluster : prior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+    ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5);
+    
+    for (int i = 1; i <= 4; i++) {
+      System.out.println("Classifier-" + i);
+      ClusterClassifier posterior = new ClusterClassifier();
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
+      assertEquals(3, posterior.getModels().size());
+      for (Cluster cluster : posterior.getModels()) {
+        System.out.println(cluster.asFormatString(null));
+      }
+      
+    }
+  }
+  
+  @Test
+  public void testMRFileClusterIteratorKMeans() throws Exception {
+    Path pointsPath = getTestTempDirPath("points");
+    Path priorPath = getTestTempDirPath("prior");
+    Path outPath = getTestTempDirPath("output");
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    Path path = new Path(priorPath, "priorClassifier");
+    ClusterClassifier prior = newKlusterClassifier();
+    prior.writeToSeqFiles(path);
+    ClusteringPolicy policy = new KMeansClusteringPolicy();
+    ClusterClassifier.writePolicy(policy, path);
+    assertEquals(3, prior.getModels().size());
+    System.out.println("Prior");
+    for (Cluster cluster : prior.getModels()) {
+      System.out.println(cluster.asFormatString(null));
+    }
+    ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5);
+    
+    for (int i = 1; i <= 4; i++) {
+      System.out.println("Classifier-" + i);
+      ClusterClassifier posterior = new ClusterClassifier();
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(conf, new Path(outPath, name));
+      assertEquals(3, posterior.getModels().size());
+      for (Cluster cluster : posterior.getModels()) {
+        System.out.println(cluster.asFormatString(null));
+      }     
+    }
+  }
+  
+  @Test
+  public void testCosineKlusterClassification() {
+    ClusterClassifier classifier = newCosineKlusterClassifier();
+    Vector pdf = classifier.classify(new DenseVector(2));
+    assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null));
+    pdf = classifier.classify(new DenseVector(2).assign(2));
+    assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
new file mode 100644
index 0000000..94762e3
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+public final class TestKmeansClustering extends MahoutTestCase {
+  
+  public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4},
+      {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}};
+  
+  private FileSystem fs;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new DenseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  public static List<Vector> getPoints(double[][] raw) {
+    List<Vector> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new SequentialAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(vec);
+    }
+    return points;
+  }
+  
+  /**
+   * Tests
+   * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)}
+   * ) single run convergence with a given distance threshold.
+   */
+  /*@Test
+  public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() {
+    double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}};
+    List<Vector> points = getPoints(rawPoints);
+
+    ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure();
+    List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3),
+        3, distanceMeasure));
+
+    // To converge in a single run, the given distance threshold should be
+    // greater than or equal to 0.125,
+    // since 0.125 will be the distance between center and centroid for the
+    // initial two clusters after one run.
+    double distanceThreshold = 0.25;
+
+    boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold);
+
+    Vector cluster1Center = clusters.get(0).getCenter();
+    assertEquals(0, cluster1Center.get(0), EPSILON);
+    assertEquals(0.125, cluster1Center.get(1), EPSILON);
+
+    Vector cluster2Center = clusters.get(1).getCenter();
+    assertEquals(0, cluster2Center.get(0), EPSILON);
+    assertEquals(0.875, cluster2Center.get(1), EPSILON);
+
+    assertTrue("KMeans iteration should be converged after a single run", converged);
+  }*/
+
+  /** Story: User wishes to run kmeans job on reference data */
+  @Test
+  public void testKMeansSeqJob() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k++) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD};
+      ToolRunner.run(conf, new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-0"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /** Story: User wishes to run kmeans job on reference data (DenseVector test) */
+  @Test
+  public void testKMeansSeqJobDenseVector() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k++) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+          DefaultOptionCreator.SEQUENTIAL_METHOD};
+      ToolRunner.run(conf, new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-0"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /** Story: User wishes to run kmeans job on reference data */
+  @Test
+  public void testKMeansMRJob() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    for (int k = 1; k < points.size(); k += 3) {
+      System.out.println("testKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      Path path = new Path(clustersPath, "part-00000");
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+      
+      try {
+        for (int i = 0; i < k + 1; i++) {
+          Vector vec = points.get(i).get();
+          
+          Kluster cluster = new Kluster(vec, i, measure);
+          // add the center so the centroid will be correct upon output
+          cluster.observe(cluster.getCenter(), 1);
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        Closeables.close(writer, false);
+      }
+      // now run the Job
+      Path outputPath = getTestTempDirPath("output" + k);
+      String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+      ToolRunner.run(getConfiguration(), new KMeansDriver(), args);
+      
+      // now compare the expected clusters with actual
+      Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+      // assertEquals("output dir files?", 4, outFiles.length);
+      int[] expect = EXPECTED_NUM_POINTS[k];
+      DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+      // The key is the clusterId, the value is the weighted vector
+      for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+          new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+        collector.collect(record.getFirst(), record.getSecond());
+      }
+      assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+    }
+  }
+  
+  /**
+   * Story: User wants to use canopy clustering to input the initial clusters
+   * for kmeans job.
+   */
+  @Test
+  public void testKMeansWithCanopyClusterInput() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+    
+    Path outputPath = getTestTempDirPath("output");
+    // now run the Canopy job
+    CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false);
+    
+    DummyOutputCollector<Text, ClusterWritable> collector1 =
+        new DummyOutputCollector<>();
+
+    FileStatus[] outParts = FileSystem.get(conf).globStatus(
+                    new Path(outputPath, "clusters-0-final/*-0*"));
+    for (FileStatus outPartStat : outParts) {
+      for (Pair<Text,ClusterWritable> record :
+               new SequenceFileIterable<Text,ClusterWritable>(
+                 outPartStat.getPath(), conf)) {
+          collector1.collect(record.getFirst(), record.getSecond());
+      }
+    }
+
+    boolean got15 = false;
+    boolean got43 = false;
+    int count = 0;
+    for (Text k : collector1.getKeys()) {
+      count++;
+      List<ClusterWritable> vl = collector1.getValue(k);
+      assertEquals("non-singleton centroid!", 1, vl.size());
+      ClusterWritable clusterWritable = vl.get(0);
+      Vector v = clusterWritable.getValue().getCenter();
+      assertEquals("cetriod vector is wrong length", 2, v.size());
+      if ( (Math.abs(v.get(0) - 1.5) < EPSILON) 
+                  && (Math.abs(v.get(1) - 1.5) < EPSILON)
+                  && !got15) {
+        got15 = true;
+      } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON) 
+                  && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON)
+                  && !got43) {
+        got43 = true;
+      } else {
+        fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']');
+      }
+    }
+    assertEquals("got unexpected number of centers", 2, count);
+
+    // now run the KMeans job
+    Path kmeansOutput = new Path(outputPath, "kmeans");
+	  KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput,
+      0.001, 10, true, 0.0, false);
+    
+    // now compare the expected clusters with actual
+    Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints");
+    DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+    
+    // The key is the clusterId, the value is the weighted vector
+    for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+        new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    
+    for (IntWritable k : collector.getKeys()) {
+      List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k);
+      assertTrue("empty cluster!", !wpvList.isEmpty());
+      if (wpvList.get(0).getVector().get(0) <= 2.0) {
+        for (WeightedPropertyVectorWritable wv : wpvList) {
+          Vector v = wv.getVector();
+          int idx = v.maxValueIndex();
+          assertTrue("bad cluster!", v.get(idx) <= 2.0);
+        }
+        assertEquals("Wrong size cluster", 4, wpvList.size());
+      } else {
+        for (WeightedPropertyVectorWritable wv : wpvList) {
+          Vector v = wv.getVector();
+          int idx = v.minValueIndex();
+          assertTrue("bad cluster!", v.get(idx) > 2.0);
+        }
+        assertEquals("Wrong size cluster", 5, wpvList.size());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
new file mode 100644
index 0000000..5cb012a
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
@@ -0,0 +1,169 @@
+   /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestRandomSeedGenerator extends MahoutTestCase {
+  
+  private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
+    {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private FileSystem fs;
+  
+  private static List<VectorWritable> getPoints() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+  
+  /** Story: test random seed generation generates 4 clusters with proper ids and data */
+  @Test
+  public void testRandomSeedGenerator() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure());
+
+    int clusterCount = 0;
+    Collection<Integer> set = Sets.newHashSet();
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+      clusterCount++;
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // Validate unique id's
+      
+      Vector v = cluster.getCenter();
+      assertVectorEquals(RAW[id], v); // Validate values match
+    }
+
+    assertEquals(4, clusterCount); // Validate sample count
+  }
+  
+  /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */
+  @Test
+  public void testRandomSeedGeneratorSeeded() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L);
+
+    int clusterCount = 0;
+    Collection<Integer> set = Sets.newHashSet();
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+      clusterCount++;
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // validate unique id's
+      
+      Vector v = cluster.getCenter();
+      assertVectorEquals(RAW[id], v); // validate values match
+    }
+
+    assertEquals(4, clusterCount); // validate sample count
+  }
+  
+  /** Test that initial clusters built with same random seed are reproduced  */
+ @Test
+ public void testBuildRandomSeededSameInitalClusters() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("random-input");
+    Path output = getTestTempDirPath("random-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+    long randSeed=1;
+    
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);
+    
+    int[] clusterIDSeq = new int[4];
+    
+    /** run through all clusters once and set sequence of IDs  */  
+    int clusterCount = 0;
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {      
+      Cluster cluster = clusterWritable.getValue();
+      clusterIDSeq[clusterCount] = cluster.getId();
+      clusterCount++; 
+    }
+    
+    /* Rebuild cluster and run through again making sure all IDs are in the same random sequence
+     * Needs a better test because in this case passes when seeded with 1 and 2  fails with 1, 3
+     * passes when set to two */
+    RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);     clusterCount = 0;    
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {       
+      Cluster cluster = clusterWritable.getValue();
+      // Make sure cluster ids are in same random sequence
+      assertEquals(clusterIDSeq[clusterCount], cluster.getId());
+      clusterCount++;
+    }
+ }
+  
+  private static void assertVectorEquals(double[] raw, Vector v) {
+    assertEquals(raw.length, v.size());
+    for (int i = 0; i < raw.length; i++) {
+      assertEquals(raw[i], v.getQuick(i), EPSILON);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
new file mode 100644
index 0000000..dd4360a
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.lda.cvb;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixUtils;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public final class TestCVBModelTrainer extends MahoutTestCase {
+
+  private static final double ETA = 0.1;
+  private static final double ALPHA = 0.1;
+
+  @Test
+  public void testInMemoryCVB0() throws Exception {
+    String[] terms = new String[26];
+    for (int i=0; i<terms.length; i++) {
+      terms[i] = String.valueOf((char) (i + 'a'));
+    }
+    int numGeneratingTopics = 3;
+    int numTerms = 26;
+    Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() {
+      @Override public double apply(double d) {
+        return 1.0 / Math.pow(d + 1.0, 2);
+      }
+    });
+
+    int numDocs = 100;
+    int numSamples = 20;
+    int numTopicsPerDoc = 1;
+
+    Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(),
+                                                             numDocs, numSamples, numTopicsPerDoc);
+
+    List<Double> perplexities = Lists.newArrayList();
+    int numTrials = 1;
+    for (int numTestTopics = 1; numTestTopics < 2 * numGeneratingTopics; numTestTopics++) {
+      double[] perps = new double[numTrials];
+      for (int trial = 0; trial < numTrials; trial++) {
+        InMemoryCollapsedVariationalBayes0 cvb =
+          new InMemoryCollapsedVariationalBayes0(sampledCorpus, terms, numTestTopics, ALPHA, ETA, 2, 1, 0);
+        cvb.setVerbose(true);
+        perps[trial] = cvb.iterateUntilConvergence(0, 5, 0, 0.2);
+        System.out.println(perps[trial]);
+      }
+      Arrays.sort(perps);
+      System.out.println(Arrays.toString(perps));
+      perplexities.add(perps[0]);
+    }
+    System.out.println(Joiner.on(",").join(perplexities));
+  }
+
+  @Test
+  public void testRandomStructuredModelViaMR() throws Exception {
+    int numGeneratingTopics = 3;
+    int numTerms = 9;
+    Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() {
+      @Override
+      public double apply(double d) {
+        return 1.0 / Math.pow(d + 1.0, 3);
+      }
+    });
+
+    int numDocs = 500;
+    int numSamples = 10;
+    int numTopicsPerDoc = 1;
+
+    Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(1234),
+                                                             numDocs, numSamples, numTopicsPerDoc);
+
+    Path sampleCorpusPath = getTestTempDirPath("corpus");
+    Configuration configuration = getConfiguration();
+    MatrixUtils.write(sampleCorpusPath, configuration, sampledCorpus);
+    int numIterations = 5;
+    List<Double> perplexities = Lists.newArrayList();
+    int startTopic = numGeneratingTopics - 1;
+    int numTestTopics = startTopic;
+    while (numTestTopics < numGeneratingTopics + 2) {
+      Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics);
+      Configuration conf = getConfiguration();
+      CVB0Driver cvb0Driver = new CVB0Driver();
+      cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
+          ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
+          1, 3, 1, false);
+      perplexities.add(lowestPerplexity(conf, topicModelStateTempPath));
+      numTestTopics++;
+    }
+    int bestTopic = -1;
+    double lowestPerplexity = Double.MAX_VALUE;
+    for (int t = 0; t < perplexities.size(); t++) {
+      if (perplexities.get(t) < lowestPerplexity) {
+        lowestPerplexity = perplexities.get(t);
+        bestTopic = t + startTopic;
+      }
+    }
+    assertEquals("The optimal number of topics is not that of the generating distribution", 4, bestTopic);
+    System.out.println("Perplexities: " + Joiner.on(", ").join(perplexities));
+  }
+
+  private static double lowestPerplexity(Configuration conf, Path topicModelTemp)
+      throws IOException {
+    double lowest = Double.MAX_VALUE;
+    double current;
+    int iteration = 2;
+    while (!Double.isNaN(current = CVB0Driver.readPerplexity(conf, topicModelTemp, iteration))) {
+      lowest = Math.min(current, lowest);
+      iteration++;
+    }
+    return lowest;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
new file mode 100644
index 0000000..cd52bbd
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.junit.Test;
+
+/**
+ * <p>Tests the affinity matrix input M/R task.</p>
+ * 
+ * <p>The tricky item with this task is that the format of the input
+ * must be correct; it must take the form of a graph input, and for the
+ * current implementation, the input must be symmetric, e.g. the weight
+ * from node A to B = the weight from node B to A. This is not explicitly
+ * enforced within the task itself (since, as of the time these tests were 
+ * written, we have not yet decided on a final rule regarding the 
+ * symmetry/non-symmetry of the affinity matrix, so we are unofficially 
+ * enforcing symmetry). Input looks something like this:</p>
+ * 
+ * <pre>0, 0, 0
+ * 0, 1, 10
+ * 0, 2, 20
+ * ...
+ * 1, 0, 10
+ * 2, 0, 20
+ * ...</pre>
+ * 
+ * <p>The mapper's task is simply to convert each line of text into a
+ * DistributedRowMatrix entry, allowing the reducer to join each entry
+ * of the same row into a VectorWritable.</p>
+ * 
+ * <p>Exceptions are thrown in cases of bad input format: if there are
+ * more or fewer than 3 numbers per line, or any of the numbers are missing.
+ */
+public class TestAffinityMatrixInputJob extends MahoutTestCase {
+  
+  private static final String [] RAW = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", "1,1,0",
+                                        "1,2,20", "2,0,10", "2,1,20", "2,2,0"};
+  private static final int RAW_DIMENSIONS = 3;
+
+  @Test
+  public void testAffinityMatrixInputMapper() throws Exception {
+    AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writer and the M/R context
+    DummyRecordWriter<IntWritable, MatrixEntryWritable> writer =
+      new DummyRecordWriter<>();
+    Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+
+    // loop through all the points and test each one is converted
+    // successfully to a DistributedRowMatrix.MatrixEntry
+    for (String s : RAW) {
+      mapper.map(new LongWritable(), new Text(s), context);
+    }
+
+    // test the data was successfully constructed
+    assertEquals("Number of map results", RAW_DIMENSIONS, writer.getData().size());
+    Set<IntWritable> keys = writer.getData().keySet();
+    for (IntWritable i : keys) {
+      List<MatrixEntryWritable> row = writer.getData().get(i);
+      assertEquals("Number of items in row", RAW_DIMENSIONS, row.size());
+    }
+  }
+  
+  @Test
+  public void testAffinitymatrixInputReducer() throws Exception {
+    AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writer and the M/R context
+    DummyRecordWriter<IntWritable, MatrixEntryWritable> mapWriter =
+      new DummyRecordWriter<>();
+    Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context
+      mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+
+    // loop through all the points and test each one is converted
+    // successfully to a DistributedRowMatrix.MatrixEntry
+    for (String s : RAW) {
+      mapper.map(new LongWritable(), new Text(s), mapContext);
+    }
+    // store the data for checking later
+    Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData();
+
+    // now reduce the data
+    AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer();
+    DummyRecordWriter<IntWritable, VectorWritable> redWriter = 
+      new DummyRecordWriter<>();
+    Reducer<IntWritable, MatrixEntryWritable,
+      IntWritable, VectorWritable>.Context redContext = DummyRecordWriter
+      .build(reducer, conf, redWriter, IntWritable.class, MatrixEntryWritable.class);
+    for (IntWritable key : mapWriter.getKeys()) {
+      reducer.reduce(key, mapWriter.getValue(key), redContext);
+    }
+    
+    // check that all the elements are correctly ordered
+    assertEquals("Number of reduce results", RAW_DIMENSIONS, redWriter.getData().size());
+    for (IntWritable row : redWriter.getKeys()) {
+      List<VectorWritable> list = redWriter.getValue(row);
+      assertEquals("Should only be one vector", 1, list.size());
+      // check that the elements in the array are correctly ordered
+      Vector v = list.get(0).get();
+      for (Vector.Element e : v.all()) {
+        // find this value in the original map
+        MatrixEntryWritable toCompare = new MatrixEntryWritable();
+        toCompare.setRow(-1);
+        toCompare.setCol(e.index());
+        toCompare.setVal(e.get());
+        assertTrue("This entry was correctly placed in its row", map.get(row).contains(toCompare));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
new file mode 100644
index 0000000..c256890
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeMapper;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeReducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>The MatrixDiagonalize task is pretty simple: given a matrix,
+ * it sums the elements of the row, and sticks the sum in position (i, i) 
+ * of a new matrix of identical dimensions to the original.</p>
+ */
+public class TestMatrixDiagonalizeJob extends MahoutTestCase {
+  
+  private static final double[][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+  private static final int RAW_DIMENSIONS = 3;
+  
+  private static double rowSum(double [] row) {
+    double sum = 0;
+    for (double r : row) {
+      sum += r;
+    }
+    return sum;
+  }
+
+  @Test
+  public void testMatrixDiagonalizeMapper() throws Exception {
+    MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writers
+    DummyRecordWriter<NullWritable, IntDoublePairWritable> writer =
+      new DummyRecordWriter<>();
+    Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW_DIMENSIONS; i++) {
+      RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS);
+      toAdd.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(toAdd), context);
+    }
+    
+    // check the number of the results
+    assertEquals("Number of map results", RAW_DIMENSIONS,
+        writer.getValue(NullWritable.get()).size());
+  }
+  
+  @Test
+ public void testMatrixDiagonalizeReducer() throws Exception {
+    MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writers
+    DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter = 
+      new DummyRecordWriter<>();
+    Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context 
+      mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW_DIMENSIONS; i++) {
+      RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS);
+      toAdd.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext);
+    }
+    
+    // now perform the reduction
+    MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer();
+    DummyRecordWriter<NullWritable, VectorWritable> redWriter = new
+      DummyRecordWriter<>();
+    Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable>.Context
+      redContext = DummyRecordWriter.build(reducer, conf, redWriter, 
+      NullWritable.class, IntDoublePairWritable.class);
+    
+    // only need one reduction
+    reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), redContext);
+    
+    // first, make sure there's only one result
+    List<VectorWritable> list = redWriter.getValue(NullWritable.get());
+    assertEquals("Only a single resulting vector", 1, list.size());
+    Vector v = list.get(0).get();
+    for (int i = 0; i < v.size(); i++) {
+      assertEquals("Element sum is correct", rowSum(RAW[i]), v.get(i),0.01);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
new file mode 100644
index 0000000..c971572
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.UnitVectorizerJob.UnitVectorizerMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestUnitVectorizerJob extends MahoutTestCase {
+
+  private static final double [][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+
+  @Test
+  public void testUnitVectorizerMapper() throws Exception {
+    UnitVectorizerMapper mapper = new UnitVectorizerMapper();
+    Configuration conf = getConfiguration();
+    
+    // set up the dummy writers
+    DummyRecordWriter<IntWritable, VectorWritable> writer = new
+      DummyRecordWriter<>();
+    Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW.length; i++) {
+      Vector vector = new RandomAccessSparseVector(RAW[i].length);
+      vector.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(vector), context);
+    }
+    
+    // check the results
+    assertEquals("Number of map results", RAW.length, writer.getData().size());
+    for (int i = 0; i < RAW.length; i++) {
+      IntWritable key = new IntWritable(i);
+      List<VectorWritable> list = writer.getValue(key);
+      assertEquals("Only one element per row", 1, list.size());
+      Vector v = list.get(0).get();
+      assertTrue("Unit vector sum is 1 or differs by 0.0001", Math.abs(v.norm(2) - 1) < 0.000001);
+    }
+  } 
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
new file mode 100644
index 0000000..e09bbe4
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.net.URI;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestVectorCache extends MahoutTestCase {
+
+  private static final double [] VECTOR = { 1, 2, 3, 4 };
+  
+  @Test
+  public void testSave() throws Exception {
+    Configuration conf = getConfiguration();
+    Writable key = new IntWritable(0);
+    Vector value = new DenseVector(VECTOR);
+    Path path = getTestTempDirPath("output");
+    
+    // write the vector out
+    VectorCache.save(key, value, path, conf, true, true);
+    
+    // can we read it from here?
+    SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<>(path, true, conf);
+    try {
+      VectorWritable old = iterator.next();
+      // test if the values are identical
+      assertEquals("Saved vector is identical to original", old.get(), value);
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+  
+  @Test
+  public void testLoad() throws Exception {
+    // save a vector manually
+    Configuration conf = getConfiguration();
+    Writable key = new IntWritable(0);
+    Vector value = new DenseVector(VECTOR);
+    Path path = getTestTempDirPath("output");
+
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    // write the vector
+    path = fs.makeQualified(path);
+    fs.deleteOnExit(path);
+    HadoopUtil.delete(conf, path);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+    try {
+      writer.append(key, new VectorWritable(value));
+    } finally {
+      Closeables.close(writer, false);
+    }
+    DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf);
+
+    // load it
+    Vector result = VectorCache.load(conf);
+    
+    // are they the same?
+    assertNotNull("Vector is null", result);
+    assertEquals("Loaded vector is not identical to original", result, value);
+  }
+  
+  @Test
+  public void testAll() throws Exception {
+    Configuration conf = getConfiguration();
+    Vector v = new DenseVector(VECTOR);
+    Path toSave = getTestTempDirPath("output");
+    Writable key = new IntWritable(0);
+    
+    // save it
+    VectorCache.save(key, v, toSave, conf);
+    
+    // now, load it back
+    Vector v2 = VectorCache.load(conf);
+    
+    // are they the same?
+    assertNotNull("Vector is null", v2);
+    assertEquals("Vectors are not identical", v2, v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
new file mode 100644
index 0000000..8cd52f4
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>This test ensures that a Vector can be successfully multiplied
+ * with a matrix.</p>
+ */
+public class TestVectorMatrixMultiplicationJob extends MahoutTestCase {
+  
+  private static final double [][] MATRIX = { {1, 1}, {2, 3} };
+  private static final double [] VECTOR = {9, 16};
+  
+  @Test
+  public void testVectorMatrixMultiplicationMapper() throws Exception {
+    VectorMatrixMultiplicationMapper mapper = new VectorMatrixMultiplicationMapper();
+    Configuration conf = getConfiguration();
+    
+    // set up all the parameters for the job
+    Vector toSave = new DenseVector(VECTOR);
+    DummyRecordWriter<IntWritable, VectorWritable> writer = new 
+      DummyRecordWriter<>();
+    Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    mapper.setup(toSave);
+    
+    // run the job
+    for (int i = 0; i < MATRIX.length; i++) {
+      Vector v = new RandomAccessSparseVector(MATRIX[i].length);
+      v.assign(MATRIX[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(v), context);
+    }
+    
+    // check the results
+    assertEquals("Number of map results", MATRIX.length, writer.getData().size());
+    for (int i = 0; i < MATRIX.length; i++) {
+      List<VectorWritable> list = writer.getValue(new IntWritable(i));
+      assertEquals("Only one vector per key", 1, list.size());
+      Vector v = list.get(0).get();
+      for (int j = 0; j < MATRIX[i].length; j++) {
+        double total = Math.sqrt(VECTOR[i]) * Math.sqrt(VECTOR[j]) * MATRIX[i][j];
+        assertEquals("Product matrix elements", total, v.get(j),EPSILON);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
new file mode 100644
index 0000000..5c73bbc
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.spectral.kmeans.EigenSeedGenerator;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestEigenSeedGenerator extends MahoutTestCase {
+
+  private
+   static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0},
+                                  {0, 1, 0}, {0, 0, 1}, {0, 0, 1}};
+
+  private FileSystem fs;
+
+  private static List<VectorWritable> getPoints() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testEigenSeedGenerator() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("eigen-input");
+    Path output = getTestTempDirPath("eigen-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+    EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure());
+
+    int clusterCount = 0;
+    Collection<Integer> set = new HashSet<>();
+    Vector v[] = new Vector[3];
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(
+             new Path(output, "part-eigenSeed"), true, conf)) {
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // validate unique id's
+      v[id] = cluster.getCenter();
+      clusterCount++;
+    }
+    assertEquals(3, clusterCount); // validate sample count
+    // validate pair-wise orthogonality
+    assertEquals(0, v[0].dot(v[1]), 1E-10);
+    assertEquals(0, v[1].dot(v[2]), 1E-10);
+    assertEquals(0, v[0].dot(v[2]), 1E-10);
+  }
+
+}