You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:36 UTC
[05/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
new file mode 100644
index 0000000..35de87e
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+@Deprecated
+public final class TestCanopyCreation extends MahoutTestCase {
+
+ private static final double[][] RAW = { { 1, 1 }, { 2, 1 }, { 1, 2 },
+ { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
+
+ private List<Canopy> referenceManhattan;
+
+ private final DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
+
+ private List<Vector> manhattanCentroids;
+
+ private List<Canopy> referenceEuclidean;
+
+ private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
+ private List<Vector> euclideanCentroids;
+
+ private FileSystem fs;
+
+ private static List<VectorWritable> getPointsWritable() {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ private static List<Vector> getPoints() {
+ List<Vector> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(vec);
+ }
+ return points;
+ }
+
+ /**
+ * Print the canopies to the transcript
+ *
+ * @param canopies
+ * a List<Canopy>
+ */
+ private static void printCanopies(Iterable<Canopy> canopies) {
+ for (Canopy canopy : canopies) {
+ System.out.println(canopy.asFormatString(null));
+ }
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(getConfiguration());
+ referenceManhattan = CanopyClusterer.createCanopies(getPoints(),
+ manhattanDistanceMeasure, 3.1, 2.1);
+ manhattanCentroids = CanopyClusterer.getCenters(referenceManhattan);
+ referenceEuclidean = CanopyClusterer.createCanopies(getPoints(),
+ euclideanDistanceMeasure, 3.1, 2.1);
+ euclideanCentroids = CanopyClusterer.getCenters(referenceEuclidean);
+ }
+
+ /**
+ * Story: User can cluster points using a ManhattanDistanceMeasure and a
+ * reference implementation
+ */
+ @Test
+ public void testReferenceManhattan() throws Exception {
+ // see setUp for cluster creation
+ printCanopies(referenceManhattan);
+ assertEquals("number of canopies", 3, referenceManhattan.size());
+ for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) {
+ Canopy testCanopy = referenceManhattan.get(canopyIx);
+ int[] expectedNumPoints = { 4, 4, 3 };
+ double[][] expectedCentroids = { { 1.5, 1.5 }, { 4.0, 4.0 },
+ { 4.666666666666667, 4.6666666666666667 } };
+ assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(),
+ expectedNumPoints[canopyIx]);
+ double[] refCentroid = expectedCentroids[canopyIx];
+ Vector testCentroid = testCanopy.computeCentroid();
+ for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+ assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
+ refCentroid[pointIx], testCentroid.get(pointIx), EPSILON);
+ }
+ }
+ }
+
+ /**
+ * Story: User can cluster points using a EuclideanDistanceMeasure and a
+ * reference implementation
+ */
+ @Test
+ public void testReferenceEuclidean() throws Exception {
+ // see setUp for cluster creation
+ printCanopies(referenceEuclidean);
+ assertEquals("number of canopies", 3, referenceEuclidean.size());
+ int[] expectedNumPoints = { 5, 5, 3 };
+ double[][] expectedCentroids = { { 1.8, 1.8 }, { 4.2, 4.2 },
+ { 4.666666666666667, 4.666666666666667 } };
+ for (int canopyIx = 0; canopyIx < referenceEuclidean.size(); canopyIx++) {
+ Canopy testCanopy = referenceEuclidean.get(canopyIx);
+ assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(),
+ expectedNumPoints[canopyIx]);
+ double[] refCentroid = expectedCentroids[canopyIx];
+ Vector testCentroid = testCanopy.computeCentroid();
+ for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) {
+ assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']',
+ refCentroid[pointIx], testCentroid.get(pointIx), EPSILON);
+ }
+ }
+ }
+
+ /**
+ * Story: User can produce initial canopy centers using a
+ * ManhattanDistanceMeasure and a CanopyMapper which clusters input points to
+ * produce an output set of canopy centroid points.
+ */
+ @Test
+ public void testCanopyMapperManhattan() throws Exception {
+ CanopyMapper mapper = new CanopyMapper();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure
+ .getClass().getName());
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "0");
+ DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+ Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+ .build(mapper, conf, writer);
+ mapper.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ // map the data
+ for (VectorWritable point : points) {
+ mapper.map(new Text(), point, context);
+ }
+ mapper.cleanup(context);
+ assertEquals("Number of map results", 1, writer.getData().size());
+ // now verify the output
+ List<VectorWritable> data = writer.getValue(new Text("centroid"));
+ assertEquals("Number of centroids", 3, data.size());
+ for (int i = 0; i < data.size(); i++) {
+ assertEquals("Centroid error",
+ manhattanCentroids.get(i).asFormatString(), data.get(i).get()
+ .asFormatString());
+ }
+ }
+
+ /**
+ * Story: User can produce initial canopy centers using a
+ * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
+ * points to produce an output set of canopy centroid points.
+ */
+ @Test
+ public void testCanopyMapperEuclidean() throws Exception {
+ CanopyMapper mapper = new CanopyMapper();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, euclideanDistanceMeasure
+ .getClass().getName());
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "0");
+ DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+ Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+ .build(mapper, conf, writer);
+ mapper.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ // map the data
+ for (VectorWritable point : points) {
+ mapper.map(new Text(), point, context);
+ }
+ mapper.cleanup(context);
+ assertEquals("Number of map results", 1, writer.getData().size());
+ // now verify the output
+ List<VectorWritable> data = writer.getValue(new Text("centroid"));
+ assertEquals("Number of centroids", 3, data.size());
+ for (int i = 0; i < data.size(); i++) {
+ assertEquals("Centroid error",
+ euclideanCentroids.get(i).asFormatString(), data.get(i).get()
+ .asFormatString());
+ }
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a
+ * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid
+ * points to produce an output set of final canopy centroid points.
+ */
+ @Test
+ public void testCanopyReducerManhattan() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+ "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "0");
+ DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+ Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+ .build(reducer, conf, writer, Text.class, VectorWritable.class);
+ reducer.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ reducer.reduce(new Text("centroid"), points, context);
+ Iterable<Text> keys = writer.getKeysInInsertionOrder();
+ assertEquals("Number of centroids", 3, Iterables.size(keys));
+ int i = 0;
+ for (Text key : keys) {
+ List<ClusterWritable> data = writer.getValue(key);
+ ClusterWritable clusterWritable = data.get(0);
+ Canopy canopy = (Canopy) clusterWritable.getValue();
+ assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to "
+ + canopy.computeCentroid().asFormatString(),
+ manhattanCentroids.get(i), canopy.computeCentroid());
+ i++;
+ }
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a
+ * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
+ * points to produce an output set of final canopy centroid points.
+ */
+ @Test
+ public void testCanopyReducerEuclidean() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "0");
+ DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+ Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context =
+ DummyRecordWriter.build(reducer, conf, writer, Text.class, VectorWritable.class);
+ reducer.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ reducer.reduce(new Text("centroid"), points, context);
+ Iterable<Text> keys = writer.getKeysInInsertionOrder();
+ assertEquals("Number of centroids", 3, Iterables.size(keys));
+ int i = 0;
+ for (Text key : keys) {
+ List<ClusterWritable> data = writer.getValue(key);
+ ClusterWritable clusterWritable = data.get(0);
+ Canopy canopy = (Canopy) clusterWritable.getValue();
+ assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to "
+ + canopy.computeCentroid().asFormatString(),
+ euclideanCentroids.get(i), canopy.computeCentroid());
+ i++;
+ }
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a Hadoop map/reduce job
+ * and a ManhattanDistanceMeasure.
+ */
+ @Test
+ public void testCanopyGenManhattanMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration config = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, config);
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file2"), fs, config);
+ // now run the Canopy Driver
+ Path output = getTestTempDirPath("output");
+ CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+ manhattanDistanceMeasure, 3.1, 2.1, false, 0.0, false);
+
+ // verify output from sequence file
+ Path path = new Path(output, "clusters-0-final/part-r-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), config);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
+ try {
+ Writable key = new Text();
+ ClusterWritable clusterWritable = new ClusterWritable();
+ assertTrue("more to come", reader.next(key, clusterWritable));
+ assertEquals("1st key", "C-0", key.toString());
+
+ List<Pair<Double,Double>> refCenters = Lists.newArrayList();
+ refCenters.add(new Pair<Double,Double>(1.5,1.5));
+ refCenters.add(new Pair<Double,Double>(4.333333333333334,4.333333333333334));
+ Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue() .getCenter().get(0),
+ clusterWritable.getValue().getCenter().get(1));
+ assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+ assertTrue("more to come", reader.next(key, clusterWritable));
+ assertEquals("2nd key", "C-1", key.toString());
+ c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+ clusterWritable.getValue().getCenter().get(1));
+ assertTrue("center " + c + " not found", findAndRemove(c, refCenters, EPSILON));
+ assertFalse("more to come", reader.next(key, clusterWritable));
+ } finally {
+ Closeables.close(reader, true);
+ }
+ }
+
+ static boolean findAndRemove(Pair<Double, Double> target, Collection<Pair<Double, Double>> list, double epsilon) {
+ for (Pair<Double,Double> curr : list) {
+ if ( (Math.abs(target.getFirst() - curr.getFirst()) < epsilon)
+ && (Math.abs(target.getSecond() - curr.getSecond()) < epsilon) ) {
+ list.remove(curr);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Story: User can produce final canopy centers using a Hadoop map/reduce job
+ * and a EuclideanDistanceMeasure.
+ */
+ @Test
+ public void testCanopyGenEuclideanMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration config = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, config);
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file2"), fs, config);
+ // now run the Canopy Driver
+ Path output = getTestTempDirPath("output");
+ CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+ euclideanDistanceMeasure, 3.1, 2.1, false, 0.0, false);
+
+ // verify output from sequence file
+ Path path = new Path(output, "clusters-0-final/part-r-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), config);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
+ try {
+ Writable key = new Text();
+ ClusterWritable clusterWritable = new ClusterWritable();
+ assertTrue("more to come", reader.next(key, clusterWritable));
+ assertEquals("1st key", "C-0", key.toString());
+
+ List<Pair<Double,Double>> refCenters = Lists.newArrayList();
+ refCenters.add(new Pair<Double,Double>(1.8,1.8));
+ refCenters.add(new Pair<Double,Double>(4.433333333333334, 4.433333333333334));
+ Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+ clusterWritable.getValue().getCenter().get(1));
+ assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+ assertTrue("more to come", reader.next(key, clusterWritable));
+ assertEquals("2nd key", "C-1", key.toString());
+ c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0),
+ clusterWritable.getValue().getCenter().get(1));
+ assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON));
+ assertFalse("more to come", reader.next(key, clusterWritable));
+ } finally {
+ Closeables.close(reader, true);
+ }
+ }
+
+ /** Story: User can cluster points using sequential execution */
+ @Test
+ public void testClusteringManhattanSeq() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration config = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, config);
+ // now run the Canopy Driver in sequential mode
+ Path output = getTestTempDirPath("output");
+ CanopyDriver.run(config, getTestTempDirPath("testdata"), output,
+ manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, true);
+
+ // verify output from sequence file
+ Path path = new Path(output, "clusters-0-final/part-r-00000");
+ int ix = 0;
+ for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+ config)) {
+ assertEquals("Center [" + ix + ']', manhattanCentroids.get(ix), clusterWritable.getValue()
+ .getCenter());
+ ix++;
+ }
+
+ path = new Path(output, "clusteredPoints/part-m-0");
+ long count = HadoopUtil.countRecords(path, config);
+ assertEquals("number of points", points.size(), count);
+ }
+
+ /** Story: User can cluster points using sequential execution */
+ @Test
+ public void testClusteringEuclideanSeq() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration config = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, config);
+ // now run the Canopy Driver in sequential mode
+ Path output = getTestTempDirPath("output");
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+ optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+ optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD };
+ ToolRunner.run(config, new CanopyDriver(), args);
+
+ // verify output from sequence file
+ Path path = new Path(output, "clusters-0-final/part-r-00000");
+
+ int ix = 0;
+ for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+ config)) {
+ assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue()
+ .getCenter());
+ ix++;
+ }
+
+ path = new Path(output, "clusteredPoints/part-m-0");
+ long count = HadoopUtil.countRecords(path, config);
+ assertEquals("number of points", points.size(), count);
+ }
+
+ /** Story: User can remove outliers while clustering points using sequential execution */
+ @Test
+ public void testClusteringEuclideanWithOutlierRemovalSeq() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration config = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, config);
+ // now run the Canopy Driver in sequential mode
+ Path output = getTestTempDirPath("output");
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+ optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+ optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.5",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+ optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD };
+ ToolRunner.run(config, new CanopyDriver(), args);
+
+ // verify output from sequence file
+ Path path = new Path(output, "clusters-0-final/part-r-00000");
+
+ int ix = 0;
+ for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true,
+ config)) {
+ assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue()
+ .getCenter());
+ ix++;
+ }
+
+ path = new Path(output, "clusteredPoints/part-m-0");
+ long count = HadoopUtil.countRecords(path, config);
+ int expectedPointsHavingPDFGreaterThanThreshold = 6;
+ assertEquals("number of points", expectedPointsHavingPDFGreaterThanThreshold, count);
+ }
+
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a ManhattanDistanceMeasure.
+ */
+ @Test
+ public void testClusteringManhattanMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file2"), fs, conf);
+ // now run the Job
+ Path output = getTestTempDirPath("output");
+ CanopyDriver.run(conf, getTestTempDirPath("testdata"), output,
+ manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, false);
+ Path path = new Path(output, "clusteredPoints/part-m-00000");
+ long count = HadoopUtil.countRecords(path, conf);
+ assertEquals("number of points", points.size(), count);
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure.
+ */
+ @Test
+ public void testClusteringEuclideanMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file2"), fs, conf);
+ // now run the Job using the run() command. Others can use runJob().
+ Path output = getTestTempDirPath("output");
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+ optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+ ToolRunner.run(getConfiguration(), new CanopyDriver(), args);
+ Path path = new Path(output, "clusteredPoints/part-m-00000");
+ long count = HadoopUtil.countRecords(path, conf);
+ assertEquals("number of points", points.size(), count);
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure and outlier removal threshold.
+ */
+ @Test
+ public void testClusteringEuclideanWithOutlierRemovalMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable();
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true,
+ getTestTempFilePath("testdata/file2"), fs, conf);
+ // now run the Job using the run() command. Others can use runJob().
+ Path output = getTestTempDirPath("output");
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "3.1",
+ optKey(DefaultOptionCreator.T2_OPTION), "2.1",
+ optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.7",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+ ToolRunner.run(getConfiguration(), new CanopyDriver(), args);
+ Path path = new Path(output, "clusteredPoints/part-m-00000");
+ long count = HadoopUtil.countRecords(path, conf);
+ int expectedPointsAfterOutlierRemoval = 8;
+ assertEquals("number of points", expectedPointsAfterOutlierRemoval, count);
+ }
+
+
+ /**
+ * Story: User can set T3 and T4 values to be used by the reducer for its T1
+ * and T2 thresholds
+ */
+ @Test
+ public void testCanopyReducerT3T4Configuration() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+ "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(1.1));
+ conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(0.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "0");
+ DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+ Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+ .build(reducer, conf, writer, Text.class, VectorWritable.class);
+ reducer.setup(context);
+ assertEquals(1.1, reducer.getCanopyClusterer().getT1(), EPSILON);
+ assertEquals(0.1, reducer.getCanopyClusterer().getT2(), EPSILON);
+ }
+
+ /**
+ * Story: User can specify a clustering limit that prevents output of small
+ * clusters
+ */
+ @Test
+ public void testCanopyMapperClusterFilter() throws Exception {
+ CanopyMapper mapper = new CanopyMapper();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure
+ .getClass().getName());
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "3");
+ DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>();
+ Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter
+ .build(mapper, conf, writer);
+ mapper.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ // map the data
+ for (VectorWritable point : points) {
+ mapper.map(new Text(), point, context);
+ }
+ mapper.cleanup(context);
+ assertEquals("Number of map results", 1, writer.getData().size());
+ // now verify the output
+ List<VectorWritable> data = writer.getValue(new Text("centroid"));
+ assertEquals("Number of centroids", 2, data.size());
+ }
+
+ /**
+ * Story: User can specify a cluster filter that limits the minimum size of
+ * canopies produced by the reducer
+ */
+ @Test
+ public void testCanopyReducerClusterFilter() throws Exception {
+ CanopyReducer reducer = new CanopyReducer();
+ Configuration conf = getConfiguration();
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
+ "org.apache.mahout.common.distance.ManhattanDistanceMeasure");
+ conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1));
+ conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1));
+ conf.set(CanopyConfigKeys.CF_KEY, "3");
+ DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>();
+ Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter
+ .build(reducer, conf, writer, Text.class, VectorWritable.class);
+ reducer.setup(context);
+
+ List<VectorWritable> points = getPointsWritable();
+ reducer.reduce(new Text("centroid"), points, context);
+ Set<Text> keys = writer.getKeys();
+ assertEquals("Number of centroids", 2, keys.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
new file mode 100644
index 0000000..cbf0e55
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ClusterClassificationDriverTest extends MahoutTestCase {
+
+ private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4},
+ {5, 4}, {4, 5}, {5, 5}, {9, 9}, {8, 8}};
+
+ private FileSystem fs;
+ private Path clusteringOutputPath;
+ private Configuration conf;
+ private Path pointsPath;
+ private Path classifiedOutputPath;
+ private List<Vector> firstCluster;
+ private List<Vector> secondCluster;
+ private List<Vector> thirdCluster;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ firstCluster = Lists.newArrayList();
+ secondCluster = Lists.newArrayList();
+ thirdCluster = Lists.newArrayList();
+
+ }
+
+ private static List<VectorWritable> getPointsWritable(double[][] raw) {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Test
+ public void testVectorClassificationWithOutlierRemovalMR() throws Exception {
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ pointsPath = getTestTempDirPath("points");
+ clusteringOutputPath = getTestTempDirPath("output");
+ classifiedOutputPath = getTestTempDirPath("classifiedClusters");
+ HadoopUtil.delete(conf, classifiedOutputPath);
+
+ conf = getConfiguration();
+
+ ClusteringTestUtils.writePointsToFile(points, true,
+ new Path(pointsPath, "file1"), fs, conf);
+ runClustering(pointsPath, conf, false);
+ runClassificationWithOutlierRemoval(false);
+ collectVectorsForAssertion();
+ assertVectorsWithOutlierRemoval();
+ }
+
+ @Test
+ public void testVectorClassificationWithoutOutlierRemoval() throws Exception {
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ pointsPath = getTestTempDirPath("points");
+ clusteringOutputPath = getTestTempDirPath("output");
+ classifiedOutputPath = getTestTempDirPath("classify");
+
+ conf = getConfiguration();
+
+ ClusteringTestUtils.writePointsToFile(points,
+ new Path(pointsPath, "file1"), fs, conf);
+ runClustering(pointsPath, conf, true);
+ runClassificationWithoutOutlierRemoval();
+ collectVectorsForAssertion();
+ assertVectorsWithoutOutlierRemoval();
+ }
+
+ @Test
+ public void testVectorClassificationWithOutlierRemoval() throws Exception {
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ pointsPath = getTestTempDirPath("points");
+ clusteringOutputPath = getTestTempDirPath("output");
+ classifiedOutputPath = getTestTempDirPath("classify");
+
+ conf = getConfiguration();
+
+ ClusteringTestUtils.writePointsToFile(points,
+ new Path(pointsPath, "file1"), fs, conf);
+ runClustering(pointsPath, conf, true);
+ runClassificationWithOutlierRemoval(true);
+ collectVectorsForAssertion();
+ assertVectorsWithOutlierRemoval();
+ }
+
+ private void runClustering(Path pointsPath, Configuration conf,
+ Boolean runSequential) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ CanopyDriver.run(conf, pointsPath, clusteringOutputPath,
+ new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, runSequential);
+ Path finalClustersPath = new Path(clusteringOutputPath, "clusters-0-final");
+ ClusterClassifier.writePolicy(new CanopyClusteringPolicy(),
+ finalClustersPath);
+ }
+
+ private void runClassificationWithoutOutlierRemoval()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.0, true, true);
+ }
+
+ private void runClassificationWithOutlierRemoval(boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.73, true, runSequential);
+ }
+
+ private void collectVectorsForAssertion() throws IOException {
+ Path[] partFilePaths = FileUtil.stat2Paths(fs
+ .globStatus(classifiedOutputPath));
+ FileStatus[] listStatus = fs.listStatus(partFilePaths,
+ PathFilters.partFilter());
+ for (FileStatus partFile : listStatus) {
+ SequenceFile.Reader classifiedVectors = new SequenceFile.Reader(fs,
+ partFile.getPath(), conf);
+ Writable clusterIdAsKey = new IntWritable();
+ WeightedPropertyVectorWritable point = new WeightedPropertyVectorWritable();
+ while (classifiedVectors.next(clusterIdAsKey, point)) {
+ collectVector(clusterIdAsKey.toString(), point.getVector());
+ }
+ }
+ }
+
+ private void collectVector(String clusterId, Vector vector) {
+ if ("0".equals(clusterId)) {
+ firstCluster.add(vector);
+ } else if ("1".equals(clusterId)) {
+ secondCluster.add(vector);
+ } else if ("2".equals(clusterId)) {
+ thirdCluster.add(vector);
+ }
+ }
+
+ private void assertVectorsWithOutlierRemoval() {
+ checkClustersWithOutlierRemoval();
+ }
+
+ private void assertVectorsWithoutOutlierRemoval() {
+ assertFirstClusterWithoutOutlierRemoval();
+ assertSecondClusterWithoutOutlierRemoval();
+ assertThirdClusterWithoutOutlierRemoval();
+ }
+
+ private void assertThirdClusterWithoutOutlierRemoval() {
+ Assert.assertEquals(2, thirdCluster.size());
+ for (Vector vector : thirdCluster) {
+ Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:9.0,1:9.0}",
+ "{0:8.0,1:8.0}"}, vector.asFormatString()));
+ }
+ }
+
+ private void assertSecondClusterWithoutOutlierRemoval() {
+ Assert.assertEquals(4, secondCluster.size());
+ for (Vector vector : secondCluster) {
+ Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:4.0,1:4.0}",
+ "{0:5.0,1:4.0}", "{0:4.0,1:5.0}", "{0:5.0,1:5.0}"},
+ vector.asFormatString()));
+ }
+ }
+
+ private void assertFirstClusterWithoutOutlierRemoval() {
+ Assert.assertEquals(3, firstCluster.size());
+ for (Vector vector : firstCluster) {
+ Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:1.0,1:1.0}",
+ "{0:2.0,1:1.0}", "{0:1.0,1:2.0}"}, vector.asFormatString()));
+ }
+ }
+
+ private void checkClustersWithOutlierRemoval() {
+ Set<String> reference = Sets.newHashSet("{0:9.0,1:9.0}", "{0:1.0,1:1.0}");
+
+ List<List<Vector>> clusters = Lists.newArrayList();
+ clusters.add(firstCluster);
+ clusters.add(secondCluster);
+ clusters.add(thirdCluster);
+
+ int singletonCnt = 0;
+ int emptyCnt = 0;
+ for (List<Vector> vList : clusters) {
+ if (vList.isEmpty()) {
+ emptyCnt++;
+ } else {
+ singletonCnt++;
+ assertEquals("expecting only singleton clusters; got size=" + vList.size(), 1, vList.size());
+ if (vList.get(0).getClass().equals(NamedVector.class)) {
+ Assert.assertTrue("not expecting cluster:" + ((NamedVector) vList.get(0)).getDelegate().asFormatString(),
+ reference.contains(((NamedVector) vList.get(0)).getDelegate().asFormatString()));
+ reference.remove(((NamedVector)vList.get(0)).getDelegate().asFormatString());
+ } else if (vList.get(0).getClass().equals(RandomAccessSparseVector.class)) {
+ Assert.assertTrue("not expecting cluster:" + vList.get(0).asFormatString(),
+ reference.contains(vList.get(0).asFormatString()));
+ reference.remove(vList.get(0).asFormatString());
+ }
+ }
+ }
+ Assert.assertEquals("Different number of empty clusters than expected!", 1, emptyCnt);
+ Assert.assertEquals("Different number of singletons than expected!", 2, singletonCnt);
+ Assert.assertEquals("Didn't match all reference clusters!", 0, reference.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
new file mode 100644
index 0000000..fc71ecf
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Closeables;
+
+public final class TestFuzzyKmeansClustering extends MahoutTestCase {
+
+ private FileSystem fs;
+ private final DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ private static Vector tweakValue(Vector point) {
+ return point.plus(0.1);
+ }
+
+ @Test
+ public void testFuzzyKMeansSeqJob() throws Exception {
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ conf,
+ new Path(clustersPath, "part-00000"),
+ Text.class,
+ SoftCluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i).get());
+ SoftCluster cluster = new SoftCluster(vec, i, measure);
+ /* add the center so the centroid will be correct upon output */
+ cluster.observe(cluster.getCenter(), 1);
+ // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+
+ // now run the Job using the run() command line options.
+ Path output = getTestTempDirPath("output" + k);
+ /* FuzzyKMeansDriver.runJob(pointsPath,
+ clustersPath,
+ output,
+ EuclideanDistanceMeasure.class.getName(),
+ 0.001,
+ 2,
+ k + 1,
+ 2,
+ false,
+ true,
+ 0);
+ */
+ String[] args = {
+ optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+ clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION),
+ output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+ "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+ "2",
+ optKey(FuzzyKMeansDriver.M_OPTION),
+ "2.0",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+ optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD
+ };
+ FuzzyKMeansDriver.main(args);
+ long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf);
+ assertTrue(count > 0);
+ }
+
+ }
+
+ @Test
+ public void testFuzzyKMeansMRJob() throws Exception {
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ conf,
+ new Path(clustersPath, "part-00000"),
+ Text.class,
+ SoftCluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i).get());
+
+ SoftCluster cluster = new SoftCluster(vec, i, measure);
+ /* add the center so the centroid will be correct upon output */
+ cluster.observe(cluster.getCenter(), 1);
+ // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+
+ // now run the Job using the run() command line options.
+ Path output = getTestTempDirPath("output" + k);
+ /* FuzzyKMeansDriver.runJob(pointsPath,
+ clustersPath,
+ output,
+ EuclideanDistanceMeasure.class.getName(),
+ 0.001,
+ 2,
+ k + 1,
+ 2,
+ false,
+ true,
+ 0);
+ */
+ String[] args = {
+ optKey(DefaultOptionCreator.INPUT_OPTION),
+ pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION),
+ clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION),
+ output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION),
+ "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION),
+ "2",
+ optKey(FuzzyKMeansDriver.M_OPTION),
+ "2.0",
+ optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION)
+ };
+ ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args);
+ long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf);
+ assertTrue(count > 0);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
new file mode 100644
index 0000000..fdcfd64
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.CosineDistanceMeasure;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestClusterClassifier extends MahoutTestCase {
+
+ private static ClusterClassifier newDMClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure));
+ models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newKlusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newCosineKlusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new CosineDistanceMeasure();
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure));
+ models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new KMeansClusteringPolicy());
+ }
+
+ private static ClusterClassifier newSoftClusterClassifier() {
+ List<Cluster> models = Lists.newArrayList();
+ DistanceMeasure measure = new ManhattanDistanceMeasure();
+ models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure));
+ models.add(new SoftCluster(new DenseVector(2), 1, measure));
+ models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure));
+ return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy());
+ }
+
+ private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException {
+ Path path = new Path(getTestTempDirPath(), "output");
+ classifier.writeToSeqFiles(path);
+ ClusterClassifier newClassifier = new ClusterClassifier();
+ newClassifier.readFromSeqFiles(getConfiguration(), path);
+ return newClassifier;
+ }
+
+ @Test
+ public void testDMClusterClassification() {
+ ClusterClassifier classifier = newDMClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testClusterClassification() {
+ ClusterClassifier classifier = newKlusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testSoftClusterClassification() {
+ ClusterClassifier classifier = newSoftClusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null));
+ }
+
+ @Test
+ public void testDMClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newDMClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testClusterClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newKlusterClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testSoftClusterClassifierSerialization() throws Exception {
+ ClusterClassifier classifier = newSoftClusterClassifier();
+ ClusterClassifier classifierOut = writeAndRead(classifier);
+ assertEquals(classifier.getModels().size(), classifierOut.getModels().size());
+ assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass()
+ .getName());
+ }
+
+ @Test
+ public void testClusterIteratorKMeans() {
+ List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+ ClusterClassifier prior = newKlusterClassifier();
+ ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+
+ @Test
+ public void testClusterIteratorDirichlet() {
+ List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
+ ClusterClassifier prior = newKlusterClassifier();
+ ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5);
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+
+ @Test
+ public void testSeqFileClusterIteratorKMeans() throws IOException {
+ Path pointsPath = getTestTempDirPath("points");
+ Path priorPath = getTestTempDirPath("prior");
+ Path outPath = getTestTempDirPath("output");
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+ Path path = new Path(priorPath, "priorClassifier");
+ ClusterClassifier prior = newKlusterClassifier();
+ prior.writeToSeqFiles(path);
+ assertEquals(3, prior.getModels().size());
+ System.out.println("Prior");
+ for (Cluster cluster : prior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5);
+
+ for (int i = 1; i <= 4; i++) {
+ System.out.println("Classifier-" + i);
+ ClusterClassifier posterior = new ClusterClassifier();
+ String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+
+ }
+ }
+
+ @Test
+ public void testMRFileClusterIteratorKMeans() throws Exception {
+ Path pointsPath = getTestTempDirPath("points");
+ Path priorPath = getTestTempDirPath("prior");
+ Path outPath = getTestTempDirPath("output");
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(pointsPath.toUri(), conf);
+ List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE);
+ ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+ Path path = new Path(priorPath, "priorClassifier");
+ ClusterClassifier prior = newKlusterClassifier();
+ prior.writeToSeqFiles(path);
+ ClusteringPolicy policy = new KMeansClusteringPolicy();
+ ClusterClassifier.writePolicy(policy, path);
+ assertEquals(3, prior.getModels().size());
+ System.out.println("Prior");
+ for (Cluster cluster : prior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5);
+
+ for (int i = 1; i <= 4; i++) {
+ System.out.println("Classifier-" + i);
+ ClusterClassifier posterior = new ClusterClassifier();
+ String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+ posterior.readFromSeqFiles(conf, new Path(outPath, name));
+ assertEquals(3, posterior.getModels().size());
+ for (Cluster cluster : posterior.getModels()) {
+ System.out.println(cluster.asFormatString(null));
+ }
+ }
+ }
+
+ @Test
+ public void testCosineKlusterClassification() {
+ ClusterClassifier classifier = newCosineKlusterClassifier();
+ Vector pdf = classifier.classify(new DenseVector(2));
+ assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null));
+ pdf = classifier.classify(new DenseVector(2).assign(2));
+ assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
new file mode 100644
index 0000000..5666765
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+public final class TestKmeansClustering extends MahoutTestCase {
+
+ public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+
+ private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4},
+ {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}};
+
+ private FileSystem fs;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ public static List<VectorWritable> getPointsWritable(double[][] raw) {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new DenseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ public static List<Vector> getPoints(double[][] raw) {
+ List<Vector> points = Lists.newArrayList();
+ for (double[] fr : raw) {
+ Vector vec = new SequentialAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(vec);
+ }
+ return points;
+ }
+
+ /**
+ * Tests
+ * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)}
+ * ) single run convergence with a given distance threshold.
+ */
+ /*@Test
+ public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() {
+ double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}};
+ List<Vector> points = getPoints(rawPoints);
+
+ ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure();
+ List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3),
+ 3, distanceMeasure));
+
+ // To converge in a single run, the given distance threshold should be
+ // greater than or equal to 0.125,
+ // since 0.125 will be the distance between center and centroid for the
+ // initial two clusters after one run.
+ double distanceThreshold = 0.25;
+
+ boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold);
+
+ Vector cluster1Center = clusters.get(0).getCenter();
+ assertEquals(0, cluster1Center.get(0), EPSILON);
+ assertEquals(0.125, cluster1Center.get(1), EPSILON);
+
+ Vector cluster2Center = clusters.get(1).getCenter();
+ assertEquals(0, cluster2Center.get(0), EPSILON);
+ assertEquals(0.875, cluster2Center.get(1), EPSILON);
+
+ assertTrue("KMeans iteration should be converged after a single run", converged);
+ }*/
+
+ /** Story: User wishes to run kmeans job on reference data */
+ @Test
+ public void testKMeansSeqJob() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k++) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD};
+ ToolRunner.run(conf, new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-0"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /** Story: User wishes to run kmeans job on reference data (DenseVector test) */
+ @Test
+ public void testKMeansSeqJobDenseVector() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k++) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD};
+ ToolRunner.run(conf, new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-0"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /** Story: User wishes to run kmeans job on reference data */
+ @Test
+ public void testKMeansMRJob() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Path clustersPath = getTestTempDirPath("clusters");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+ for (int k = 1; k < points.size(); k += 3) {
+ System.out.println("testKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ Path path = new Path(clustersPath, "part-00000");
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
+
+ try {
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = points.get(i).get();
+
+ Kluster cluster = new Kluster(vec, i, measure);
+ // add the center so the centroid will be correct upon output
+ cluster.observe(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), cluster);
+ }
+ } finally {
+ Closeables.close(writer, false);
+ }
+ // now run the Job
+ Path outputPath = getTestTempDirPath("output" + k);
+ String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(),
+ optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+ ToolRunner.run(getConfiguration(), new KMeansDriver(), args);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(outputPath, "clusteredPoints");
+ // assertEquals("output dir files?", 4, outFiles.length);
+ int[] expect = EXPECTED_NUM_POINTS[k];
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+ assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size());
+ }
+ }
+
+ /**
+ * Story: User wants to use canopy clustering to input the initial clusters
+ * for kmeans job.
+ */
+ @Test
+ public void testKMeansWithCanopyClusterInput() throws Exception {
+ List<VectorWritable> points = getPointsWritable(REFERENCE);
+
+ Path pointsPath = getTestTempDirPath("points");
+ Configuration conf = getConfiguration();
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf);
+
+ Path outputPath = getTestTempDirPath("output");
+ // now run the Canopy job
+ CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false);
+
+ DummyOutputCollector<Text, ClusterWritable> collector1 =
+ new DummyOutputCollector<Text, ClusterWritable>();
+
+ FileStatus[] outParts = FileSystem.get(conf).globStatus(
+ new Path(outputPath, "clusters-0-final/*-0*"));
+ for (FileStatus outPartStat : outParts) {
+ for (Pair<Text,ClusterWritable> record :
+ new SequenceFileIterable<Text,ClusterWritable>(
+ outPartStat.getPath(), conf)) {
+ collector1.collect(record.getFirst(), record.getSecond());
+ }
+ }
+
+ boolean got15 = false;
+ boolean got43 = false;
+ int count = 0;
+ for (Text k : collector1.getKeys()) {
+ count++;
+ List<ClusterWritable> vl = collector1.getValue(k);
+ assertEquals("non-singleton centroid!", 1, vl.size());
+ ClusterWritable clusterWritable = vl.get(0);
+ Vector v = clusterWritable.getValue().getCenter();
+ assertEquals("cetriod vector is wrong length", 2, v.size());
+ if ( (Math.abs(v.get(0) - 1.5) < EPSILON)
+ && (Math.abs(v.get(1) - 1.5) < EPSILON)
+ && !got15) {
+ got15 = true;
+ } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON)
+ && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON)
+ && !got43) {
+ got43 = true;
+ } else {
+ fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']');
+ }
+ }
+ assertEquals("got unexpected number of centers", 2, count);
+
+ // now run the KMeans job
+ Path kmeansOutput = new Path(outputPath, "kmeans");
+ KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput,
+ 0.001, 10, true, 0.0, false);
+
+ // now compare the expected clusters with actual
+ Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints");
+ DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>();
+
+ // The key is the clusterId, the value is the weighted vector
+ for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>(
+ new Path(clusteredPointsPath, "part-m-00000"), conf)) {
+ collector.collect(record.getFirst(), record.getSecond());
+ }
+
+ for (IntWritable k : collector.getKeys()) {
+ List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k);
+ assertTrue("empty cluster!", !wpvList.isEmpty());
+ if (wpvList.get(0).getVector().get(0) <= 2.0) {
+ for (WeightedPropertyVectorWritable wv : wpvList) {
+ Vector v = wv.getVector();
+ int idx = v.maxValueIndex();
+ assertTrue("bad cluster!", v.get(idx) <= 2.0);
+ }
+ assertEquals("Wrong size cluster", 4, wpvList.size());
+ } else {
+ for (WeightedPropertyVectorWritable wv : wpvList) {
+ Vector v = wv.getVector();
+ int idx = v.minValueIndex();
+ assertTrue("bad cluster!", v.get(idx) > 2.0);
+ }
+ assertEquals("Wrong size cluster", 5, wpvList.size());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
----------------------------------------------------------------------
diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
new file mode 100644
index 0000000..5cb012a
--- /dev/null
+++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java
@@ -0,0 +1,169 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestRandomSeedGenerator extends MahoutTestCase {
+
+ private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2},
+ {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+
+ private FileSystem fs;
+
+ private static List<VectorWritable> getPoints() {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ }
+
+ /** Story: test random seed generation generates 4 clusters with proper ids and data */
+ @Test
+ public void testRandomSeedGenerator() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure());
+
+ int clusterCount = 0;
+ Collection<Integer> set = Sets.newHashSet();
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ clusterCount++;
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // Validate unique id's
+
+ Vector v = cluster.getCenter();
+ assertVectorEquals(RAW[id], v); // Validate values match
+ }
+
+ assertEquals(4, clusterCount); // Validate sample count
+ }
+
+ /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */
+ @Test
+ public void testRandomSeedGeneratorSeeded() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L);
+
+ int clusterCount = 0;
+ Collection<Integer> set = Sets.newHashSet();
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ clusterCount++;
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // validate unique id's
+
+ Vector v = cluster.getCenter();
+ assertVectorEquals(RAW[id], v); // validate values match
+ }
+
+ assertEquals(4, clusterCount); // validate sample count
+ }
+
+ /** Test that initial clusters built with same random seed are reproduced */
+ @Test
+ public void testBuildRandomSeededSameInitalClusters() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("random-input");
+ Path output = getTestTempDirPath("random-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+ long randSeed=1;
+
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed);
+
+ int[] clusterIDSeq = new int[4];
+
+ /** run through all clusters once and set sequence of IDs */
+ int clusterCount = 0;
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ clusterIDSeq[clusterCount] = cluster.getId();
+ clusterCount++;
+ }
+
+ /* Rebuild cluster and run through again making sure all IDs are in the same random sequence
+ * Needs a better test because in this case passes when seeded with 1 and 2 fails with 1, 3
+ * passes when set to two */
+ RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); clusterCount = 0;
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ // Make sure cluster ids are in same random sequence
+ assertEquals(clusterIDSeq[clusterCount], cluster.getId());
+ clusterCount++;
+ }
+ }
+
+ private static void assertVectorEquals(double[] raw, Vector v) {
+ assertEquals(raw.length, v.size());
+ for (int i = 0; i < raw.length; i++) {
+ assertEquals(raw[i], v.getQuick(i), EPSILON);
+ }
+ }
+}