You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:07 UTC
[05/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
new file mode 100644
index 0000000..fc71ecf
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Closeables;
+
+public final class TestFuzzyKmeansClustering extends MahoutTestCase {
+
+ private FileSystem fs;
+ private final DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ private static Vector tweakValue(Vector point) {
+ return point.plus(0.1);
+ }
+
+ @Test
+ public void testFuzzyKMeansSeqJob() throws Exception {
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ conf,
+ new Path(clustersPath, "part-00000"),
+ Text.class,
+ SoftCluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i).get());
+ SoftCluster cluster = new SoftCluster(vec, i, measure);
+ /* add the center so the centroid will be correct upon output */
+ cluster.observe(cluster.getCenter(), 1);
+ // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+
+ // now run the Job using the run() command line options.
+ Path output = getTestTempDirPath("output" + k);
+ /* FuzzyKMeansDriver.runJob(pointsPath,
+ clustersPath,
+ output,
+ EuclideanDistanceMeasure.class.getName(),
+ 0.001,
+ 2,
+ k + 1,
+ 2,
+ false,
+ true,
+ 0);
+ */
+ String[] args = {
+ optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+ clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION),
+ output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+ "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+ "2",
+ optKey(FuzzyKMeansDriver.M_OPTION),
+ "2.0",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+ optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD
+ };
+ FuzzyKMeansDriver.main(args);
+ long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf);
+ assertTrue(count > 0);
+ }
+
+ }
+
+ @Test
+ public void testFuzzyKMeansMRJob() throws Exception {
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ conf,
+ new Path(clustersPath, "part-00000"),
+ Text.class,
+ SoftCluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i).get());
+
+ SoftCluster cluster = new SoftCluster(vec, i, measure);
+ /* add the center so the centroid will be correct upon output */
+ cluster.observe(cluster.getCenter(), 1);
+ // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+
+ // now run the Job using the run() command line options.
+ Path output = getTestTempDirPath("output" + k);
+ /* FuzzyKMeansDriver.runJob(pointsPath,
+ clustersPath,
+ output,
+ EuclideanDistanceMeasure.class.getName(),
+ 0.001,
+ 2,
+ k + 1,
+ 2,
+ false,
+ true,
+ 0);
+ */
+ String[] args = {
+ optKey(DefaultOptionCreator.INPUT_OPTION),
+ pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+ clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION),
+ output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+ "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+ "2",
+ optKey(FuzzyKMeansDriver.M_OPTION),
+ "2.0",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION)
+ };
+ ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args);
+ long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf);
+ assertTrue(count > 0);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
new file mode 100644
index 0000000..fdcfd64
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.CosineDistanceMeasure;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestClusterClassifier extends MahoutTestCase {
+
+ private static ClusterClassifier newDMClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure));
+ models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newKlusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newCosineKlusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new CosineDistanceMeasure();
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newSoftClusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new SoftCluster(new DenseVector(2), 1, measure));
+ models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy());
+ }
+
+ private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException {
+ Path path = new Path(getTestTempDirPath(), "output");
+ classifier.writeToSeqFiles(path);
+ ClusterClassifier newClassifier = new ClusterClassifier();
+ newClassifier.readFromSeqFiles(getConfiguration(), path);
+ return newClassifier;
+ }
+
+ @Test
+ public void testDMClusterClassification() {
+ ClusterClassifier classifier = newDMClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testClusterClassification() {
+ ClusterClassifier classifier = newKlusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testSoftClusterClassification() {
+ ClusterClassifier classifier = newSoftClusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testDMClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newDMClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testClusterClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newKlusterClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testSoftClusterClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newSoftClusterClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testClusterIteratorKMeans() {
+ List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+ ClusterClassifier prior = newKlusterClassifier();
+ ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+
+ @Test
+ public void testClusterIteratorDirichlet() {
+ List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+ ClusterClassifier prior = newKlusterClassifier();
+ ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+
+ @Test
+ public void testSeqFileClusterIteratorKMeans() throws IOException {
+ Path pointsPath = getTestTempDirPath("points");
+ Path priorPath = getTestTempDirPath("prior");
+ Path outPath = getTestTempDirPath("output");
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+ Path path = new Path(priorPath, "priorClassifier");
+ ClusterClassifier prior = newKlusterClassifier();
+ prior.writeToSeqFiles(path);
+ assertEquals(3, prior.getModels().size());
+ System.out.println("Prior");
+ for (Cluster cluster : prior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5);
+
+ for (int i = 1; i <= 4; i++) {
+ System.out.println("Classifier-" + i);
+ ClusterClassifier posterior = new ClusterClassifier();
+ String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+
+ }
+ }
+
+ @Test
+ public void testMRFileClusterIteratorKMeans() throws Exception {
+ Path pointsPath = getTestTempDirPath("points");
+ Path priorPath = getTestTempDirPath("prior");
+ Path outPath = getTestTempDirPath("output");
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+ Path path = new Path(priorPath, "priorClassifier");
+ ClusterClassifier prior = newKlusterClassifier();
+ prior.writeToSeqFiles(path);
+ ClusteringPolicy policy = new KMeansClusteringPolicy();
+ ClusterClassifier.writePolicy(policy, path);
+ assertEquals(3, prior.getModels().size());
+ System.out.println("Prior");
+ for (Cluster cluster : prior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5);
+
+ for (int i = 1; i <= 4; i++) {
+ System.out.println("Classifier-" + i);
+ ClusterClassifier posterior = new ClusterClassifier();
+ String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+ }
+
+ @Test
+ public void testCosineKlusterClassification() {
+ ClusterClassifier classifier = newCosineKlusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
new file mode 100644
index 0000000..94762e3
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+public final class TestKmeansClustering extends MahoutTestCase {
+
+ public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+
+ private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4},
+ {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}};
+
+ private FileSystem fs;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ public static List<VectorWritable> getPointsWritable(double[][] raw) {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new DenseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ public static List<Vector> getPoints(double[][] raw) {
+ List<Vector> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new SequentialAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(vec);
+ }
+ return points;
+ }
+
+ /**
+ * Tests
+ * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)}
+ * ) single run convergence with a given distance threshold.
+ */
+ /*@Test
+ public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() {
+ double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}};
+ List<Vector> points = getPoints(rawPoints);
+
+ ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure();
+ List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3),
+ 3, distanceMeasure));
+
+ // To converge in a single run, the given distance threshold should be
+ // greater than or equal to 0.125,
+ // since 0.125 will be the distance between center and centroid for the
+ // initial two clusters after one run.
+ double distanceThreshold = 0.25;
+
+ boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold);
+
+ Vector cluster1Center = clusters.get(0).getCenter();
+ assertEquals(0, cluster1Center.get(0), EPSILON);
+ assertEquals(0.125, cluster1Center.get(1), EPSILON);
+
+ Vector cluster2Center = clusters.get(1).getCenter();
+ assertEquals(0, cluster2Center.get(0), EPSILON);
+ assertEquals(0.875, cluster2Center.get(1), EPSILON);
+
+ assertTrue("KMeans iteration should be converged after a single run", converged);
+ }*/
+
+ /** Story: User wishes to run kmeans job on reference data */
+ @Test
+ public void testKMeansSeqJob() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k++) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD};
+ ToolRunner.run(conf, new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-0"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /** Story: User wishes to run kmeans job on reference data (DenseVector test) */
+ @Test
+ public void testKMeansSeqJobDenseVector() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k++) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD};
+ ToolRunner.run(conf, new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-0"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /** Story: User wishes to run kmeans job on reference data */
+ @Test
+ public void testKMeansMRJob() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k += 3) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+ ToolRunner.run(getConfiguration(), new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ // assertEquals("output dir files?", 4, outFiles.length);
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /**
+ * Story: User wants to use canopy clustering to input the initial clusters
+ * for kmeans job.
+ */
+ @Test
+ public void testKMeansWithCanopyClusterInput() throws Exception {
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+
+ Path outputPath = getTestTempDirPath("output");
+ // now run the Canopy job
+ CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false);
+
+ DummyOutputCollector<Text, ClusterWritable> collector1 =
+ new DummyOutputCollector<>();
+
+ FileStatus[] outParts = FileSystem.get(conf).globStatus(
+ new Path(outputPath, "clusters-0-final/*-0*"));
+ for (FileStatus outPartStat : outParts) {
+ for (Pair<Text,ClusterWritable> record :
+ new SequenceFileIterable<Text,ClusterWritable>(
+ outPartStat.getPath(), conf)) {
+ collector1.collect(record.getFirst(), record.getSecond());
+ }
+ }
+
+ boolean got15 = false;
+ boolean got43 = false;
+ int count = 0;
+ for (Text k : collector1.getKeys()) {
+ count++;
+ List<ClusterWritable> vl = collector1.getValue(k);
+ assertEquals("non-singleton centroid!", 1, vl.size());
+ ClusterWritable clusterWritable = vl.get(0);
+ Vector v = clusterWritable.getValue().getCenter();
+ assertEquals("cetriod vector is wrong length", 2, v.size());
+ if ( (Math.abs(v.get(0) - 1.5) < EPSILON)
+ && (Math.abs(v.get(1) - 1.5) < EPSILON)
+ && !got15) {
+ got15 = true;
+ } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON)
+ && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON)
+ && !got43) {
+ got43 = true;
+ } else {
+ fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']');
+ }
+ }
+ assertEquals("got unexpected number of centers", 2, count);
+
+ // now run the KMeans job
+ Path kmeansOutput = new Path(outputPath, "kmeans");
+ KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput,
+ 0.001, 10, true, 0.0, false);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints");
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>();
+
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+
+ for (IntWritable k : collector.getKeys()) {
+ List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k);
+ assertTrue("empty cluster!", !wpvList.isEmpty());
+ if (wpvList.get(0).getVector().get(0) <= 2.0) {
+ for (WeightedPropertyVectorWritable wv : wpvList) {
+ Vector v = wv.getVector();
+ int idx = v.maxValueIndex();
+ assertTrue("bad cluster!", v.get(idx) <= 2.0);
+ }
+ assertEquals("Wrong size cluster", 4, wpvList.size());
+ } else {
+ for (WeightedPropertyVectorWritable wv : wpvList) {
+ Vector v = wv.getVector();
+ int idx = v.minValueIndex();
+ assertTrue("bad cluster!", v.get(idx) > 2.0);
+ }
+ assertEquals("Wrong size cluster", 5, wpvList.size());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
new file mode 100644
index 0000000..5cb012a
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
@@ -0,0 +1,169 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestRandomSeedGenerator extends MahoutTestCase {
+
+ private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
+ {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+
+ private FileSystem fs;
+
+ private static List<VectorWritable> getPoints() {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ /** Story: test random seed generation generates 4 clusters with proper ids and data */
+ @Test
+ public void testRandomSeedGenerator() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure());
+
+ int clusterCount = 0;
+ Collection<Integer> set = Sets.newHashSet();
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ clusterCount++;
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // Validate unique id's
+
+ Vector v = cluster.getCenter();
+ assertVectorEquals(RAW[id], v); // Validate values match
+ }
+
+ assertEquals(4, clusterCount); // Validate sample count
+ }
+
+ /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */
+ @Test
+ public void testRandomSeedGeneratorSeeded() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L);
+
+ int clusterCount = 0;
+ Collection<Integer> set = Sets.newHashSet();
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ clusterCount++;
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // validate unique id's
+
+ Vector v = cluster.getCenter();
+ assertVectorEquals(RAW[id], v); // validate values match
+ }
+
+ assertEquals(4, clusterCount); // validate sample count
+ }
+
+ /** Test that initial clusters built with same random seed are reproduced */
+ @Test
+ public void testBuildRandomSeededSameInitalClusters() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+ long randSeed=1;
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);
+
+ int[] clusterIDSeq = new int[4];
+
+ /** run through all clusters once and set sequence of IDs */
+ int clusterCount = 0;
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ clusterIDSeq[clusterCount] = cluster.getId();
+ clusterCount++;
+ }
+
+ /* Rebuild cluster and run through again making sure all IDs are in the same random sequence
+ * Needs a better test because in this case passes when seeded with 1 and 2 fails with 1, 3
+ * passes when set to two */
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); clusterCount = 0;
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ // Make sure cluster ids are in same random sequence
+ assertEquals(clusterIDSeq[clusterCount], cluster.getId());
+ clusterCount++;
+ }
+ }
+
+ private static void assertVectorEquals(double[] raw, Vector v) {
+ assertEquals(raw.length, v.size());
+ for (int i = 0; i < raw.length; i++) {
+ assertEquals(raw[i], v.getQuick(i), EPSILON);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
new file mode 100644
index 0000000..dd4360a
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.lda.cvb;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixUtils;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public final class TestCVBModelTrainer extends MahoutTestCase {
+
+ private static final double ETA = 0.1;
+ private static final double ALPHA = 0.1;
+
+ @Test
+ public void testInMemoryCVB0() throws Exception {
+ String[] terms = new String[26];
+ for (int i=0; i<terms.length; i++) {
+ terms[i] = String.valueOf((char) (i + 'a'));
+ }
+ int numGeneratingTopics = 3;
+ int numTerms = 26;
+ Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() {
+ @Override public double apply(double d) {
+ return 1.0 / Math.pow(d + 1.0, 2);
+ }
+ });
+
+ int numDocs = 100;
+ int numSamples = 20;
+ int numTopicsPerDoc = 1;
+
+ Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(),
+ numDocs, numSamples, numTopicsPerDoc);
+
+ List<Double> perplexities = Lists.newArrayList();
+ int numTrials = 1;
+ for (int numTestTopics = 1; numTestTopics < 2 * numGeneratingTopics; numTestTopics++) {
+ double[] perps = new double[numTrials];
+ for (int trial = 0; trial < numTrials; trial++) {
+ InMemoryCollapsedVariationalBayes0 cvb =
+ new InMemoryCollapsedVariationalBayes0(sampledCorpus, terms, numTestTopics, ALPHA, ETA, 2, 1, 0);
+ cvb.setVerbose(true);
+ perps[trial] = cvb.iterateUntilConvergence(0, 5, 0, 0.2);
+ System.out.println(perps[trial]);
+ }
+ Arrays.sort(perps);
+ System.out.println(Arrays.toString(perps));
+ perplexities.add(perps[0]);
+ }
+ System.out.println(Joiner.on(",").join(perplexities));
+ }
+
+ @Test
+ public void testRandomStructuredModelViaMR() throws Exception {
+ int numGeneratingTopics = 3;
+ int numTerms = 9;
+ Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() {
+ @Override
+ public double apply(double d) {
+ return 1.0 / Math.pow(d + 1.0, 3);
+ }
+ });
+
+ int numDocs = 500;
+ int numSamples = 10;
+ int numTopicsPerDoc = 1;
+
+ Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(1234),
+ numDocs, numSamples, numTopicsPerDoc);
+
+ Path sampleCorpusPath = getTestTempDirPath("corpus");
+ Configuration configuration = getConfiguration();
+ MatrixUtils.write(sampleCorpusPath, configuration, sampledCorpus);
+ int numIterations = 5;
+ List<Double> perplexities = Lists.newArrayList();
+ int startTopic = numGeneratingTopics - 1;
+ int numTestTopics = startTopic;
+ while (numTestTopics < numGeneratingTopics + 2) {
+ Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics);
+ Configuration conf = getConfiguration();
+ CVB0Driver cvb0Driver = new CVB0Driver();
+ cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
+ ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2,
+ 1, 3, 1, false);
+ perplexities.add(lowestPerplexity(conf, topicModelStateTempPath));
+ numTestTopics++;
+ }
+ int bestTopic = -1;
+ double lowestPerplexity = Double.MAX_VALUE;
+ for (int t = 0; t < perplexities.size(); t++) {
+ if (perplexities.get(t) < lowestPerplexity) {
+ lowestPerplexity = perplexities.get(t);
+ bestTopic = t + startTopic;
+ }
+ }
+ assertEquals("The optimal number of topics is not that of the generating distribution", 4, bestTopic);
+ System.out.println("Perplexities: " + Joiner.on(", ").join(perplexities));
+ }
+
+ private static double lowestPerplexity(Configuration conf, Path topicModelTemp)
+ throws IOException {
+ double lowest = Double.MAX_VALUE;
+ double current;
+ int iteration = 2;
+ while (!Double.isNaN(current = CVB0Driver.readPerplexity(conf, topicModelTemp, iteration))) {
+ lowest = Math.min(current, lowest);
+ iteration++;
+ }
+ return lowest;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
new file mode 100644
index 0000000..cd52bbd
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.junit.Test;
+
+/**
+ * <p>Tests the affinity matrix input M/R task.</p>
+ *
+ * <p>The tricky item with this task is that the format of the input
+ * must be correct; it must take the form of a graph input, and for the
+ * current implementation, the input must be symmetric, e.g. the weight
+ * from node A to B = the weight from node B to A. This is not explicitly
+ * enforced within the task itself (since, as of the time these tests were
+ * written, we have not yet decided on a final rule regarding the
+ * symmetry/non-symmetry of the affinity matrix, so we are unofficially
+ * enforcing symmetry). Input looks something like this:</p>
+ *
+ * <pre>0, 0, 0
+ * 0, 1, 10
+ * 0, 2, 20
+ * ...
+ * 1, 0, 10
+ * 2, 0, 20
+ * ...</pre>
+ *
+ * <p>The mapper's task is simply to convert each line of text into a
+ * DistributedRowMatrix entry, allowing the reducer to join each entry
+ * of the same row into a VectorWritable.</p>
+ *
+ * <p>Exceptions are thrown in cases of bad input format: if there are
+ * more or fewer than 3 numbers per line, or any of the numbers are missing.
+ */
+public class TestAffinityMatrixInputJob extends MahoutTestCase {
+
+ private static final String [] RAW = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", "1,1,0",
+ "1,2,20", "2,0,10", "2,1,20", "2,2,0"};
+ private static final int RAW_DIMENSIONS = 3;
+
+ @Test
+ public void testAffinityMatrixInputMapper() throws Exception {
+ AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+ Configuration conf = getConfiguration();
+ conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+
+ // set up the dummy writer and the M/R context
+ DummyRecordWriter<IntWritable, MatrixEntryWritable> writer =
+ new DummyRecordWriter<>();
+ Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context
+ context = DummyRecordWriter.build(mapper, conf, writer);
+
+ // loop through all the points and test each one is converted
+ // successfully to a DistributedRowMatrix.MatrixEntry
+ for (String s : RAW) {
+ mapper.map(new LongWritable(), new Text(s), context);
+ }
+
+ // test the data was successfully constructed
+ assertEquals("Number of map results", RAW_DIMENSIONS, writer.getData().size());
+ Set<IntWritable> keys = writer.getData().keySet();
+ for (IntWritable i : keys) {
+ List<MatrixEntryWritable> row = writer.getData().get(i);
+ assertEquals("Number of items in row", RAW_DIMENSIONS, row.size());
+ }
+ }
+
+ @Test
+ public void testAffinitymatrixInputReducer() throws Exception {
+ AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+ Configuration conf = getConfiguration();
+ conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+
+ // set up the dummy writer and the M/R context
+ DummyRecordWriter<IntWritable, MatrixEntryWritable> mapWriter =
+ new DummyRecordWriter<>();
+ Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context
+ mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+
+ // loop through all the points and test each one is converted
+ // successfully to a DistributedRowMatrix.MatrixEntry
+ for (String s : RAW) {
+ mapper.map(new LongWritable(), new Text(s), mapContext);
+ }
+ // store the data for checking later
+ Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData();
+
+ // now reduce the data
+ AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer();
+ DummyRecordWriter<IntWritable, VectorWritable> redWriter =
+ new DummyRecordWriter<>();
+ Reducer<IntWritable, MatrixEntryWritable,
+ IntWritable, VectorWritable>.Context redContext = DummyRecordWriter
+ .build(reducer, conf, redWriter, IntWritable.class, MatrixEntryWritable.class);
+ for (IntWritable key : mapWriter.getKeys()) {
+ reducer.reduce(key, mapWriter.getValue(key), redContext);
+ }
+
+ // check that all the elements are correctly ordered
+ assertEquals("Number of reduce results", RAW_DIMENSIONS, redWriter.getData().size());
+ for (IntWritable row : redWriter.getKeys()) {
+ List<VectorWritable> list = redWriter.getValue(row);
+ assertEquals("Should only be one vector", 1, list.size());
+ // check that the elements in the array are correctly ordered
+ Vector v = list.get(0).get();
+ for (Vector.Element e : v.all()) {
+ // find this value in the original map
+ MatrixEntryWritable toCompare = new MatrixEntryWritable();
+ toCompare.setRow(-1);
+ toCompare.setCol(e.index());
+ toCompare.setVal(e.get());
+ assertTrue("This entry was correctly placed in its row", map.get(row).contains(toCompare));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
new file mode 100644
index 0000000..c256890
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeMapper;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeReducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>The MatrixDiagonalize task is pretty simple: given a matrix,
+ * it sums the elements of the row, and sticks the sum in position (i, i)
+ * of a new matrix of identical dimensions to the original.</p>
+ */
+public class TestMatrixDiagonalizeJob extends MahoutTestCase {
+
+ private static final double[][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+ private static final int RAW_DIMENSIONS = 3;
+
+ private static double rowSum(double [] row) {
+ double sum = 0;
+ for (double r : row) {
+ sum += r;
+ }
+ return sum;
+ }
+
+ @Test
+ public void testMatrixDiagonalizeMapper() throws Exception {
+ MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+ Configuration conf = getConfiguration();
+ conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+
+ // set up the dummy writers
+ DummyRecordWriter<NullWritable, IntDoublePairWritable> writer =
+ new DummyRecordWriter<>();
+ Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context
+ context = DummyRecordWriter.build(mapper, conf, writer);
+
+ // perform the mapping
+ for (int i = 0; i < RAW_DIMENSIONS; i++) {
+ RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS);
+ toAdd.assign(RAW[i]);
+ mapper.map(new IntWritable(i), new VectorWritable(toAdd), context);
+ }
+
+ // check the number of the results
+ assertEquals("Number of map results", RAW_DIMENSIONS,
+ writer.getValue(NullWritable.get()).size());
+ }
+
+ @Test
+ public void testMatrixDiagonalizeReducer() throws Exception {
+ MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+ Configuration conf = getConfiguration();
+ conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+
+ // set up the dummy writers
+ DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter =
+ new DummyRecordWriter<>();
+ Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context
+ mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+
+ // perform the mapping
+ for (int i = 0; i < RAW_DIMENSIONS; i++) {
+ RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS);
+ toAdd.assign(RAW[i]);
+ mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext);
+ }
+
+ // now perform the reduction
+ MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer();
+ DummyRecordWriter<NullWritable, VectorWritable> redWriter = new
+ DummyRecordWriter<>();
+ Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable>.Context
+ redContext = DummyRecordWriter.build(reducer, conf, redWriter,
+ NullWritable.class, IntDoublePairWritable.class);
+
+ // only need one reduction
+ reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), redContext);
+
+ // first, make sure there's only one result
+ List<VectorWritable> list = redWriter.getValue(NullWritable.get());
+ assertEquals("Only a single resulting vector", 1, list.size());
+ Vector v = list.get(0).get();
+ for (int i = 0; i < v.size(); i++) {
+ assertEquals("Element sum is correct", rowSum(RAW[i]), v.get(i),0.01);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
new file mode 100644
index 0000000..c971572
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.UnitVectorizerJob.UnitVectorizerMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestUnitVectorizerJob extends MahoutTestCase {
+
+ private static final double [][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+
+ @Test
+ public void testUnitVectorizerMapper() throws Exception {
+ UnitVectorizerMapper mapper = new UnitVectorizerMapper();
+ Configuration conf = getConfiguration();
+
+ // set up the dummy writers
+ DummyRecordWriter<IntWritable, VectorWritable> writer = new
+ DummyRecordWriter<>();
+ Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context
+ context = DummyRecordWriter.build(mapper, conf, writer);
+
+ // perform the mapping
+ for (int i = 0; i < RAW.length; i++) {
+ Vector vector = new RandomAccessSparseVector(RAW[i].length);
+ vector.assign(RAW[i]);
+ mapper.map(new IntWritable(i), new VectorWritable(vector), context);
+ }
+
+ // check the results
+ assertEquals("Number of map results", RAW.length, writer.getData().size());
+ for (int i = 0; i < RAW.length; i++) {
+ IntWritable key = new IntWritable(i);
+ List<VectorWritable> list = writer.getValue(key);
+ assertEquals("Only one element per row", 1, list.size());
+ Vector v = list.get(0).get();
+ assertTrue("Unit vector sum is 1 or differs by 0.0001", Math.abs(v.norm(2) - 1) < 0.000001);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
new file mode 100644
index 0000000..e09bbe4
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.net.URI;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestVectorCache extends MahoutTestCase {
+
+ private static final double [] VECTOR = { 1, 2, 3, 4 };
+
+ @Test
+ public void testSave() throws Exception {
+ Configuration conf = getConfiguration();
+ Writable key = new IntWritable(0);
+ Vector value = new DenseVector(VECTOR);
+ Path path = getTestTempDirPath("output");
+
+ // write the vector out
+ VectorCache.save(key, value, path, conf, true, true);
+
+ // can we read it from here?
+ SequenceFileValueIterator<VectorWritable> iterator =
+ new SequenceFileValueIterator<>(path, true, conf);
+ try {
+ VectorWritable old = iterator.next();
+ // test if the values are identical
+ assertEquals("Saved vector is identical to original", old.get(), value);
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+ // save a vector manually
+ Configuration conf = getConfiguration();
+ Writable key = new IntWritable(0);
+ Vector value = new DenseVector(VECTOR);
+ Path path = getTestTempDirPath("output");
+
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ // write the vector
+ path = fs.makeQualified(path);
+ fs.deleteOnExit(path);
+ HadoopUtil.delete(conf, path);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+ try {
+ writer.append(key, new VectorWritable(value));
+ } finally {
+ Closeables.close(writer, false);
+ }
+ DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf);
+
+ // load it
+ Vector result = VectorCache.load(conf);
+
+ // are they the same?
+ assertNotNull("Vector is null", result);
+ assertEquals("Loaded vector is not identical to original", result, value);
+ }
+
+ @Test
+ public void testAll() throws Exception {
+ Configuration conf = getConfiguration();
+ Vector v = new DenseVector(VECTOR);
+ Path toSave = getTestTempDirPath("output");
+ Writable key = new IntWritable(0);
+
+ // save it
+ VectorCache.save(key, v, toSave, conf);
+
+ // now, load it back
+ Vector v2 = VectorCache.load(conf);
+
+ // are they the same?
+ assertNotNull("Vector is null", v2);
+ assertEquals("Vectors are not identical", v2, v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
new file mode 100644
index 0000000..8cd52f4
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>This test ensures that a Vector can be successfully multiplied
+ * with a matrix.</p>
+ */
+public class TestVectorMatrixMultiplicationJob extends MahoutTestCase {
+
+ private static final double [][] MATRIX = { {1, 1}, {2, 3} };
+ private static final double [] VECTOR = {9, 16};
+
+ @Test
+ public void testVectorMatrixMultiplicationMapper() throws Exception {
+ VectorMatrixMultiplicationMapper mapper = new VectorMatrixMultiplicationMapper();
+ Configuration conf = getConfiguration();
+
+ // set up all the parameters for the job
+ Vector toSave = new DenseVector(VECTOR);
+ DummyRecordWriter<IntWritable, VectorWritable> writer = new
+ DummyRecordWriter<>();
+ Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context
+ context = DummyRecordWriter.build(mapper, conf, writer);
+ mapper.setup(toSave);
+
+ // run the job
+ for (int i = 0; i < MATRIX.length; i++) {
+ Vector v = new RandomAccessSparseVector(MATRIX[i].length);
+ v.assign(MATRIX[i]);
+ mapper.map(new IntWritable(i), new VectorWritable(v), context);
+ }
+
+ // check the results
+ assertEquals("Number of map results", MATRIX.length, writer.getData().size());
+ for (int i = 0; i < MATRIX.length; i++) {
+ List<VectorWritable> list = writer.getValue(new IntWritable(i));
+ assertEquals("Only one vector per key", 1, list.size());
+ Vector v = list.get(0).get();
+ for (int j = 0; j < MATRIX[i].length; j++) {
+ double total = Math.sqrt(VECTOR[i]) * Math.sqrt(VECTOR[j]) * MATRIX[i][j];
+ assertEquals("Product matrix elements", total, v.get(j),EPSILON);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
new file mode 100644
index 0000000..5c73bbc
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.spectral.kmeans.EigenSeedGenerator;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestEigenSeedGenerator extends MahoutTestCase {
+
+ private
+ static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0},
+ {0, 1, 0}, {0, 0, 1}, {0, 0, 1}};
+
+ private FileSystem fs;
+
+ private static List<VectorWritable> getPoints() {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testEigenSeedGenerator() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("eigen-input");
+ Path output = getTestTempDirPath("eigen-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure());
+
+ int clusterCount = 0;
+ Collection<Integer> set = new HashSet<>();
+ Vector v[] = new Vector[3];
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(
+ new Path(output, "part-eigenSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // validate unique id's
+ v[id] = cluster.getCenter();
+ clusterCount++;
+ }
+ assertEquals(3, clusterCount); // validate sample count
+ // validate pair-wise orthogonality
+ assertEquals(0, v[0].dot(v[1]), 1E-10);
+ assertEquals(0, v[1].dot(v[2]), 1E-10);
+ assertEquals(0, v[0].dot(v[2]), 1E-10);
+ }
+
+}