You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/08/22 19:05:40 UTC
svn commit: r688122 - in /lucene/mahout/trunk/core/src:
main/java/org/apache/mahout/clustering/fuzzykmeans/
test/java/org/apache/mahout/clustering/fuzzykmeans/
test/java/org/apache/mahout/clustering/kmeans/
Author: gsingers
Date: Fri Aug 22 10:05:39 2008
New Revision: 688122
URL: http://svn.apache.org/viewvc?rev=688122&view=rev
Log:
MAHOUT-74: Fuzzy K Means
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java (with props)
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (with props)
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (with props)
Modified:
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,36 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+
+public class FuzzyKMeansClusterMapper extends FuzzyKMeansMapper {
+ public void map(WritableComparable key, Text values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Vector point = AbstractVector.decodeVector(values.toString());
+ SoftCluster.outputPointWithClusterProbabilities(point, clusters, values,
+ output);
+ }
+
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+
+public class FuzzyKMeansCombiner extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text> {
+
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ SoftCluster cluster = SoftCluster.decodeCluster(key.toString());
+ while (values.hasNext()) {
+ String pointInfo = values.next().toString();
+ double pointProb = Double.parseDouble(pointInfo.substring(0, pointInfo
+ .indexOf(":")));
+
+ String encodedVector = pointInfo.substring(pointInfo.indexOf(":") + 1);
+ cluster.addPoint(AbstractVector.decodeVector(encodedVector), pointProb
+ * SoftCluster.getM());
+ }
+ output.collect(key, new Text(cluster.getPointProbSum() + ", "
+ + cluster.getWeightedPointTotal().asFormatString()));
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ SoftCluster.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,233 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class FuzzyKMeansDriver {
+
+ private static final Log log = LogFactory.getLog(FuzzyKMeansDriver.class);
+
+ private FuzzyKMeansDriver() {
+ }
+
+ public static void main(String[] args) {
+ String input = args[0];
+ String clusters = args[1];
+ String output = args[2];
+ String measureClass = args[3];
+ double convergenceDelta = new Double(args[4]);
+ int maxIterations = new Integer(args[5]);
+ int m = new Integer(args[6]);
+ runJob(input, clusters, output, measureClass, convergenceDelta,
+ maxIterations, 10,m);
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for initial & computed clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param maxIterations the maximum number of iterations
+ * @param numMapTasks the number of mapper tasks
+ */
+ public static void runJob(String input, String clustersIn, String output,
+ String measureClass, double convergenceDelta, int maxIterations,
+ int numMapTasks, int m) {
+ try {
+
+ boolean converged = false;
+ int iteration = 0;
+ String delta = Double.toString(convergenceDelta);
+
+ // iterate until the clusters converge
+ while (!converged && iteration < maxIterations) {
+ log.info("Iteration {" + iteration + "}");
+
+ // point the output to a new directory per iteration
+ String clustersOut = output + File.separator + "clusters-" + iteration;
+ converged = runIteration(input, clustersIn, clustersOut, measureClass,
+ delta, numMapTasks, iteration, m);
+
+ // now point the input to the old output directory
+ clustersIn = output + File.separator + "clusters-" + iteration;
+ iteration++;
+ }
+
+ // now actually cluster the points
+ log.info("Clustering ");
+
+ runClustering(input, clustersIn, output + File.separator + "points",
+ measureClass, delta, numMapTasks, m);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for iniput clusters
+ * @param clustersOut the directory pathname for output clusters
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param numMapTasks the number of map tasks
+ * @param iterationNumber the iteration number that is going to run
+ * @param m
+ * @return true if the iteration successfully runs
+ */
+ private static boolean runIteration(String input, String clustersIn,
+ String clustersOut, String measureClass, String convergenceDelta,
+ int numMapTasks, int iterationNumber, int m) {
+
+ JobConf conf = new JobConf(FuzzyKMeansJob.class);
+ conf.setJobName("Fuzzy K Means{" + iterationNumber + "}");
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(input));
+ Path outPath = new Path(clustersOut);
+ FileOutputFormat.setOutputPath(conf, outPath);
+
+ conf.setMapperClass(FuzzyKMeansMapper.class);
+ conf.setCombinerClass(FuzzyKMeansCombiner.class);
+ conf.setReducerClass(FuzzyKMeansReducer.class);
+ conf.setNumMapTasks(numMapTasks);
+ conf.setNumReduceTasks(numMapTasks);
+
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.set(SoftCluster.CLUSTER_PATH_KEY, clustersIn);
+ conf.set(SoftCluster.DISTANCE_MEASURE_KEY, measureClass);
+ conf.set(SoftCluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+ conf.set(SoftCluster.M_KEY, String.valueOf(m));
+
+ try {
+ JobClient.runJob(conf);
+ FileSystem fs = FileSystem.get(conf);
+ return isConverged(clustersOut, conf, fs);
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for input clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param numMapTasks the number of map tasks
+ */
+ private static void runClustering(String input, String clustersIn,
+ String output, String measureClass, String convergenceDelta,
+ int numMapTasks, float m) {
+
+ JobConf conf = new JobConf(FuzzyKMeansDriver.class);
+ conf.setJobName("Fuzzy K Means Clustering");
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(input));
+ Path outPath = new Path(output);
+ FileOutputFormat.setOutputPath(conf, outPath);
+
+ conf.setMapperClass(FuzzyKMeansClusterMapper.class);
+
+ // uncomment it to run locally
+ // conf.set("mapred.job.tracker", "local");
+ conf.setNumMapTasks(numMapTasks);
+ conf.setNumReduceTasks(0);
+ conf.set(SoftCluster.CLUSTER_PATH_KEY, clustersIn);
+ conf.set(SoftCluster.DISTANCE_MEASURE_KEY, measureClass);
+ conf.set(SoftCluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+ conf.set(SoftCluster.M_KEY, String.valueOf(m));
+ try {
+ JobClient.runJob(conf);
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ }
+ }
+
+ /**
+ * Return if all of the Clusters in the filePath have converged or not
+ *
+ * @param filePath the file path to the single file containing the clusters
+ * @param conf the JobConf
+ * @param fs the FileSystem
+ * @return true if all Clusters are converged
+ * @throws IOException if there was an IO error
+ */
+ private static boolean isConverged(String filePath, JobConf conf,
+ FileSystem fs) throws IOException {
+
+ Path clusterPath = new Path(filePath);
+ List<Path> result = new ArrayList<Path>();
+
+ PathFilter clusterFileFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part");
+ }
+ };
+
+ FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(
+ clusterPath, clusterFileFilter)), clusterFileFilter);
+
+ for (FileStatus match : matches) {
+ result.add(fs.makeQualified(match.getPath()));
+ }
+ boolean converged = true;
+
+ for (Path p : result) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+ Text key = new Text();
+ Text value = new Text();
+
+ while (converged && reader.next(key, value)) {
+ converged = value.toString().startsWith("V");
+ }
+ }
+
+ return converged;
+ }
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.utils.ManhattanDistanceMeasure;
+
+public class FuzzyKMeansJob {
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length != 9) {
+ System.out.println("Expected num Arguments: 9 received:" + args.length);
+ printMessage();
+ }
+ int index = 0;
+ String input = args[index++];
+ String clusters = args[index++];
+ String output = args[index++];
+ String measureClass = args[index++];
+ double convergenceDelta = new Double(args[index++]);
+ int maxIterations = new Integer(args[index++]);
+ int numMapTasks = Integer.parseInt(args[index++]);
+ boolean doCanopy = Boolean.parseBoolean(args[index++]);
+ int m = Integer.parseInt(args[index++]);
+
+ runJob(input, clusters, output,
+ measureClass, convergenceDelta,
+ maxIterations, numMapTasks, doCanopy,m);
+ }
+
+ /**
+ * Prints Error Message
+ */
+ private static void printMessage() {
+ System.out
+ .println("Usage: inputDir clusterDir OutputDir ConvergenceDelata maxIterations numMapTasks doCanopy");
+ System.exit(1);
+ }
+
+ /**
+ * Run the job using supplied arguments
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for initial clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
+ * @param convergenceDelta the convergence delta value
+ * @param maxIterations the maximum number of iterations
+ * @param numMapTasks the number of maptasks
+ * @doCanopy does canopy needed for initial clusters
+ * @m param needed to fuzzify the cluster membership values
+ */
+ public static void runJob(String input, String clustersIn, String output,
+ String measureClass, double convergenceDelta, int maxIterations,
+ int numMapTasks, boolean doCanopy, int m) {
+ try {
+
+ // run canopy to find initial clusters
+ if (doCanopy) {
+ CanopyDriver.runJob(input, clustersIn, ManhattanDistanceMeasure.class
+ .getName(), 100.1, 50.1);
+
+ }
+ // run fuzzy k -means
+ FuzzyKMeansDriver.runJob(input, clustersIn, output, measureClass,
+ convergenceDelta, maxIterations, numMapTasks,m);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,137 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FuzzyKMeansMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Text, Text, Text> {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FuzzyKMeansMapper.class);
+
+ protected List<SoftCluster> clusters;
+
+ public void map(WritableComparable key, Text values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Vector point = AbstractVector.decodeVector(values.toString());
+ SoftCluster.emitPointProbToCluster(point, clusters, values, output);
+ }
+
+ /**
+ * Configure the mapper by providing its clusters. Used by unit tests.
+ *
+ * @param clusters a List<Cluster>
+ */
+ void config(List<SoftCluster> clusters) {
+ this.clusters = clusters;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ @Override
+ public void configure(JobConf job) {
+
+ super.configure(job);
+ SoftCluster.configure(job);
+
+ log.info("In Mapper Configure:");
+ clusters = new ArrayList<SoftCluster>();
+
+ configureWithClusterInfo(job);
+
+ if (clusters.size() == 0)
+ throw new NullPointerException("Cluster is empty!!!");
+ }
+
+ /**
+ * Configure the mapper with the cluster info
+ *
+ * @param job
+ * @param clusters
+ */
+ protected void configureWithClusterInfo(JobConf job) {
+ // Get the path location where the cluster Info is stored
+ String clusterPathStr = job.get(SoftCluster.CLUSTER_PATH_KEY);
+ Path clusterPath = new Path(clusterPathStr);
+ List<Path> result = new ArrayList<Path>();
+
+ // filter out the files
+ PathFilter clusterFileFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part");
+ }
+ };
+
+ try {
+ // get all filtered file names in result list
+ FileSystem fs = clusterPath.getFileSystem(job);
+ FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(
+ clusterPath, clusterFileFilter)), clusterFileFilter);
+
+ for (FileStatus match : matches) {
+ result.add(fs.makeQualified(match.getPath()));
+ }
+
+ // iterate thru the result path list
+ for (Path path : result) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ try {
+ Text key = new Text();
+ Text value = new Text();
+ int counter = 1;
+ while (reader.next(key, value)) {
+ // get the cluster info
+ SoftCluster cluster = SoftCluster.decodeCluster(value.toString());
+ // add the center so the centroid will be correct on output
+ // formatting
+ cluster.addPoint(cluster.getCenter(), 1);
+ clusters.add(cluster);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+
+public class FuzzyKMeansReducer extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text> {
+
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ SoftCluster cluster = SoftCluster.decodeCluster(key.toString());
+ while (values.hasNext()) {
+ String value = values.next().toString();
+
+ int ix = value.indexOf(',');
+ try {
+ double partialSumPtProb = new Double(value.substring(0, ix));
+ Vector total = AbstractVector.decodeVector(value.substring(ix + 2));
+ cluster.addPoints(partialSumPtProb, total);
+ } catch (Exception e) {
+ // Escaped from Combiner. So, let's do that processing too:
+ System.out.println("Escaped from combiner: Key:" + key.toString()
+ + " Value:" + value);
+ double pointProb = Double.parseDouble(value.substring(0, value
+ .indexOf(":")));
+
+ String encodedVector = value.substring(value.indexOf(":") + 1);
+ cluster.addPoint(AbstractVector.decodeVector(encodedVector), pointProb
+ * SoftCluster.getM());
+ }
+ }
+
+ // force convergence calculation
+ cluster.computeConvergence();
+ output.collect(new Text(cluster.getIdentifier()), new Text(SoftCluster
+ .formatCluster(cluster)));
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ SoftCluster.configure(job);
+ }
+
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,354 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+
+public class SoftCluster {
+
+ public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
+ public static final String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
+ public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
+ public static final String M_KEY = "org.apache.mahout.clustering.fuzzykmeans.m";
+
+ private static float m = 2; //default value
+ public static final double minimalValue = 0.0000000001; // using it for adding
+
+ // exception
+ // this value to any
+ // zero valued
+ // variable to avoid
+ // divide by Zero
+
+ private static int nextClusterId = 0;
+
+ // this cluster's clusterId
+ private int clusterId;
+
+ // the current center
+ private Vector center = new SparseVector(0);
+
+ // the current centroid is lazy evaluated and may be null
+ private Vector centroid = null;
+
+ // The Probability of belongingness sum
+ private double pointProbSum = 0.0;
+
+ // the total of all points added to the cluster
+ private Vector weightedPointTotal = null;
+
+ // has the centroid converged with the center?
+ private boolean converged = false;
+
+ private static DistanceMeasure measure;
+ private static double convergenceDelta = 0;
+
+ /**
+ * Format the SoftCluster for output
+ *
+ * @param cluster the Cluster
+ * @return
+ */
+ public static String formatCluster(SoftCluster cluster) {
+ return cluster.getIdentifier() + ": "
+ + cluster.computeCentroid().asFormatString();
+ }
+
+ /**
+ * Decodes and returns a SoftCluster from the formattedString
+ *
+ * @param formattedString a String produced by formatCluster
+ * @return
+ */
+ public static SoftCluster decodeCluster(String formattedString) {
+ int beginIndex = formattedString.indexOf('[');
+ String id = formattedString.substring(0, beginIndex);
+ String center = formattedString.substring(beginIndex);
+ if (id.startsWith("C") || id.startsWith("V")) {
+ int clusterId = new Integer(formattedString.substring(1, beginIndex - 2));
+ Vector clusterCenter = null;
+
+ clusterCenter = AbstractVector.decodeVector(center);
+
+ SoftCluster cluster = new SoftCluster(clusterCenter, clusterId);
+ cluster.converged = id.startsWith("V");
+ return cluster;
+ }
+ return null;
+ }
+
+ /**
+ * Configure the distance measure from the job
+ *
+ * @param job the JobConf for the job
+ */
+ public static void configure(JobConf job) {
+ try {
+ final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+ Class<?> cl = ccl.loadClass(job.get(DISTANCE_MEASURE_KEY));
+ measure = (DistanceMeasure) cl.newInstance();
+ measure.configure(job);
+ convergenceDelta = new Double(job.get(CLUSTER_CONVERGENCE_KEY));
+ nextClusterId = 0;
+ m = Float.parseFloat(job.get(M_KEY));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Configure the distance measure directly. Used by unit tests.
+ *
+ * @param aMeasure the DistanceMeasure
+ * @param aConvergenceDelta the delta value used to define convergence
+ */
+ public static void config(DistanceMeasure aMeasure, double aConvergenceDelta) {
+ measure = aMeasure;
+ convergenceDelta = aConvergenceDelta;
+ nextClusterId = 0;
+ }
+
+ /**
+ * Emit the point and its probability of belongingness to each cluster
+ *
+ * @param point a point
+ * @param clusters a List<SoftCluster>
+ * @param values a Writable containing the input point and possible other
+ * values of interest (payload)
+ * @param output the OutputCollector to emit into
+ * @throws IOException
+ */
+ public static void emitPointProbToCluster(Vector point,
+ List<SoftCluster> clusters, Text values,
+ OutputCollector<Text, Text> output) throws IOException {
+ List<Double> clusterDistanceList = new ArrayList<Double>();
+ for (SoftCluster cluster : clusters) {
+ clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+ }
+
+ for (int i = 0; i < clusters.size(); i++) {
+ double probWeight = computeProbWeight(clusterDistanceList.get(i),
+ clusterDistanceList);
+
+ Text key = new Text(formatCluster(clusters.get(i)));
+ Text value = new Text(Double.toString(probWeight) + ":"
+ + values.toString());
+ output.collect(key, value);
+ }
+ }
+
+ /**
+ * Output point with cluster info (Cluster and probability)
+ *
+ * @param point a point
+ * @param clusters a List<SoftCluster> to test
+ * @param values a Writable containing the input point and possible other
+ * values of interest (payload)
+ * @param output the OutputCollector to emit into
+ * @throws IOException
+ */
+ public static void outputPointWithClusterProbabilities(Vector point,
+ List<SoftCluster> clusters, Text values,
+ OutputCollector<Text, Text> output) throws IOException {
+
+ String outputKey = values.toString();
+ StringBuffer outputValue = new StringBuffer("[");
+ List<Double> clusterDistanceList = new ArrayList<Double>();
+
+ for (SoftCluster cluster : clusters) {
+ clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+ }
+
+ for (int i = 0; i < clusters.size(); i++) {
+ double probWeight = computeProbWeight(clusterDistanceList.get(i),
+ clusterDistanceList);
+ outputValue.append(clusters.get(i).clusterId).append(":").append(
+ probWeight).append(" ");
+ }
+ output.collect(new Text(outputKey.trim()), new Text(outputValue.toString()
+ .trim()
+ + "]"));
+ }
+
+ /**
+ * Computes the probability of a point belonging to a cluster
+ *
+ * @param clusterDistance
+ * @param clusterDistanceList
+ * @return
+ */
+ public static double computeProbWeight(double clusterDistance,
+ List<Double> clusterDistanceList) {
+ double denom = 0.0;
+ if (clusterDistance == 0) {
+ clusterDistance = minimalValue;
+ }
+ for (Double eachCDist : clusterDistanceList) {
+ if (eachCDist == 0)
+ eachCDist = minimalValue;
+
+ denom += Math.pow(clusterDistance / eachCDist, (double) 2 / (m - 1));
+
+ }
+ double val = (double) (1) / denom;
+ return val;
+ }
+
+ /**
+ * Compute the centroid
+ *
+ * @return the new centroid
+ */
+ private Vector computeCentroid() {
+ if (pointProbSum == 0)
+ return weightedPointTotal;
+ else if (centroid == null) {
+ // lazy compute new centroid
+ centroid = weightedPointTotal.divide(pointProbSum);
+ }
+ return centroid;
+ }
+
+ /**
+ * Construct a new SoftCluster with the given point as its center
+ *
+ * @param center the center point
+ */
+ public SoftCluster(Vector center) {
+ super();
+ this.clusterId = nextClusterId++;
+ this.center = center;
+ this.pointProbSum = 0;
+
+ this.weightedPointTotal = center.like();
+ }
+
+ /**
+ * Construct a new SoftCluster with the given point as its center
+ *
+ * @param center the center point
+ */
+ public SoftCluster(Vector center, int clusterId) {
+ super();
+ this.clusterId = clusterId;
+ this.center = center;
+ this.pointProbSum = 0;
+ this.weightedPointTotal = center.like();
+ }
+
+ @Override
+ public String toString() {
+ return getIdentifier() + " - " + center.asFormatString();
+ }
+
+ public String getIdentifier() {
+ if (converged)
+ return "V" + clusterId;
+ else
+ return "C" + clusterId;
+ }
+
+ /**
+ * Add the point to the SoftCluster
+ *
+ * @param point a point to add
+ * @param ptProb
+ */
+ public void addPoint(Vector point, double ptProb) {
+ centroid = null;
+ pointProbSum += ptProb;
+ if (weightedPointTotal == null)
+ weightedPointTotal = point.copy().times(ptProb);
+ else
+ weightedPointTotal = point.times(ptProb).plus(weightedPointTotal);
+ }
+
+ /**
+ * Add the point to the SoftCluster
+ *
+ * @param partialSumPtProb
+ * @param delta a point to add
+ */
+ public void addPoints(double partialSumPtProb, Vector delta) {
+ centroid = null;
+ pointProbSum += partialSumPtProb;
+ if (weightedPointTotal == null)
+ weightedPointTotal = delta.copy();
+ else
+ weightedPointTotal = delta.plus(weightedPointTotal);
+ }
+
+ public Vector getCenter() {
+ return center;
+ }
+
+ public double getPointProbSum() {
+ return pointProbSum;
+ }
+
+ /**
+ * Compute the centroid and set the center to it.
+ */
+ public void recomputeCenter() {
+ center = computeCentroid();
+ pointProbSum = 0;
+ weightedPointTotal = center.like();
+ }
+
+ /**
+ * Return if the cluster is converged by comparing its center and centroid.
+ *
+ * @return if the cluster is converged
+ */
+ public boolean computeConvergence() {
+ Vector centroid = computeCentroid();
+ converged = measure.distance(centroid, center) <= convergenceDelta;
+ return converged;
+ }
+
+ public Vector getWeightedPointTotal() {
+ return weightedPointTotal;
+ }
+
+ public boolean isConverged() {
+ return converged;
+ }
+
+ public int getClusterId() {
+ return clusterId;
+ }
+
+ public static float getM() {
+ return m;
+ }
+
+
+
+}
Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,566 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.DummyOutputCollector;
+import org.apache.mahout.utils.EuclideanDistanceMeasure;
+
+public class TestFuzzyKmeansClustering extends TestCase {
+
+ private void rmr(String path) throws Exception {
+ File f = new File(path);
+ if (f.exists()) {
+ if (f.isDirectory()) {
+ String[] contents = f.list();
+ for (int i = 0; i < contents.length; i++)
+ rmr(f.toString() + File.separator + contents[i]);
+ }
+ f.delete();
+ }
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ rmr("output");
+ rmr("testdata");
+ }
+
+ public static double round(double val, int places) {
+ long factor = (long) Math.pow(10, places);
+
+ // Shift the decimal the correct number of places
+ // to the right.
+ val = val * factor;
+
+ // Round to the nearest integer.
+ long tmp = Math.round(val);
+
+ // Shift the decimal the correct number of places
+ // back to the left.
+ return (double) tmp / factor;
+ }
+
+ public static Vector tweakValue(Vector point) {
+ return point.plus(0.1);
+
+ }
+
+ public void referenceFuzzyKMeans(List<Vector> points,
+ List<SoftCluster> clusterList, Map<String, String> pointClusterInfo,
+ String distanceMeasureClass, double threshold, int numIter)
+ throws Exception {
+ final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+ Class<?> cl = ccl.loadClass(distanceMeasureClass);
+
+ DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
+ SoftCluster.config(measure, threshold);
+ boolean converged = false;
+ for (int iter = 0; !converged && iter < numIter; iter++) {
+ iterateReference(points, clusterList, measure);
+ }
+ computeCluster(points, clusterList, measure, pointClusterInfo);
+ }
+
+ public void iterateReference(List<Vector> points,
+ List<SoftCluster> clusterList, DistanceMeasure measure) {
+ // for each
+ for (Vector point : points) {
+ List<Double> clusterDistanceList = new ArrayList<Double>();
+ for (SoftCluster cluster : clusterList) {
+ clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+ }
+
+ for (int i = 0; i < clusterList.size(); i++) {
+ double probWeight = SoftCluster.computeProbWeight(clusterDistanceList
+ .get(i), clusterDistanceList);
+ clusterList.get(i).addPoint(point, probWeight * SoftCluster.getM());
+ }
+ }
+ for (SoftCluster cluster : clusterList) {
+ cluster.computeConvergence();
+ cluster.recomputeCenter();
+ }
+ }
+
+ public void computeCluster(List<Vector> points,
+ List<SoftCluster> clusterList, DistanceMeasure measure,
+ Map<String, String> pointClusterInfo) {
+
+ for (Vector point : points) {
+ StringBuffer outputValue = new StringBuffer("[");
+ List<Double> clusterDistanceList = new ArrayList<Double>();
+ for (SoftCluster cluster : clusterList) {
+ clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+ }
+ for (int i = 0; i < clusterList.size(); i++) {
+ double probWeight = SoftCluster.computeProbWeight(clusterDistanceList
+ .get(i), clusterDistanceList);
+ outputValue.append(clusterList.get(i).getClusterId()).append(":")
+ .append(probWeight).append(" ");
+ }
+
+ pointClusterInfo.put(point.asFormatString().trim(), outputValue
+ .toString().trim()
+ + "]");
+ }
+ }
+
+ public void testReferenceImplementation() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("test k= " + k);
+
+ List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+ // pick k initial cluster centers at random
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+ SoftCluster cluster = new SoftCluster(vec);
+ // add the center so the centroid will be correct upon output
+ cluster.addPoint(cluster.getCenter(), 1);
+
+ clusterList.add(cluster);
+ }
+ Map<String, String> pointClusterInfo = new HashMap<String, String>();
+ // run reference FuzzyKmeans algorithm
+ referenceFuzzyKMeans(points, clusterList, pointClusterInfo,
+ EuclideanDistanceMeasure.class.getName(), 0.001, 2);
+ Iterator<Map.Entry<String, String>> iterator = pointClusterInfo
+ .entrySet().iterator();
+
+ // iterate for each point
+ while (iterator.hasNext()) {
+ Map.Entry<String, String> entry = iterator.next();
+ String value = entry.getValue();
+
+ String clusterInfoStr = value.substring(1, value.length() - 1);
+ String[] clusterInfoList = clusterInfoStr.split(" ");
+ assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+ double prob = 0.0;
+ for (String clusterInfo : clusterInfoList) {
+ String[] clusterProb = clusterInfo.split(":");
+
+ double clusterProbVal = Double.parseDouble(clusterProb[1]);
+ prob += clusterProbVal;
+ }
+ prob = round(prob, 1);
+ assertEquals(
+ "Sum of cluster Membership problability should be equal to=", 1.0,
+ prob);
+ }
+ }
+ }
+
+ public void testFuzzyKMeansMRJob() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+ File testData = new File("testdata");
+ if (!testData.exists())
+ testData.mkdir();
+ testData = new File("testdata/points");
+ if (!testData.exists())
+ testData.mkdir();
+
+ TestKmeansClustering.writePointsToFile(points, "testdata/points/file1");
+ TestKmeansClustering.writePointsToFile(points, "testdata/points/file2");
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ JobConf job = new JobConf(FuzzyKMeansDriver.class);
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path("testdata/clusters/part-00000");
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path,
+ Text.class, Text.class);
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+
+ SoftCluster cluster = new SoftCluster(vec);
+ // add the center so the centroid will be correct upon output
+ cluster.addPoint(cluster.getCenter(), 1);
+ writer.append(new Text(cluster.getIdentifier()), new Text(SoftCluster
+ .formatCluster(cluster)));
+ }
+ writer.close();
+
+ JobConf conf = new JobConf(FuzzyKMeansDriver.class);
+ Path outPath = new Path("output");
+ fs = FileSystem.get(conf);
+ if (fs.exists(outPath)) {
+ fs.delete(outPath, true);
+ }
+ fs.mkdirs(outPath);
+ // now run the Job
+ FuzzyKMeansDriver.runJob("testdata/points", "testdata/clusters",
+ "output", EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1,2);
+
+ // now compare the expected clusters with actual
+ File outDir = new File("output/points");
+ assertTrue("output dir exists?", outDir.exists());
+ String[] outFiles = outDir.list();
+ assertEquals("output dir files?", 4, outFiles.length);
+ BufferedReader reader = new BufferedReader(new FileReader(
+ "output/points/part-00000"));
+
+ while (reader.ready()) {
+ String line = reader.readLine();
+ String[] lineParts = line.split("\t");
+ assertEquals("line parts", 2, lineParts.length);
+ String clusterInfoStr = lineParts[1].substring(1,
+ lineParts[1].length() - 1);
+ String[] clusterInfoList = clusterInfoStr.split(" ");
+ assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+ double prob = 0.0;
+ for (String clusterInfo : clusterInfoList) {
+ String[] clusterProb = clusterInfo.split(":");
+
+ double clusterProbVal = Double.parseDouble(clusterProb[1]);
+ prob += clusterProbVal;
+ }
+ prob = round(prob, 1);
+ assertEquals(
+ "Sum of cluster Membership problability should be equal to=", 1.0,
+ prob);
+ }
+
+ reader.close();
+
+ }
+
+ }
+
+ public void testFuzzyKMeansMapper() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ SoftCluster.config(measure, 0.001);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+
+ SoftCluster cluster = new SoftCluster(vec);
+ cluster.addPoint(cluster.getCenter(), 1);
+ clusterList.add(cluster);
+ }
+
+ // run mapper
+ FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+ mapper.config(clusterList);
+
+ DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+ for (Vector point : points) {
+ mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+ null);
+ }
+
+ // now verify mapper output
+ assertEquals("Mapper Keys", k + 1, mapCollector.getData().size());
+
+ Map<String, Double> pointTotalProbMap = new HashMap<String, Double>();
+
+ for (String key : mapCollector.getKeys()) {
+ SoftCluster cluster = SoftCluster.decodeCluster(key);
+ List<Text> values = mapCollector.getValue(key);
+
+ for (Text value : values) {
+ String pointInfo = value.toString();
+ double pointProb = Double.parseDouble(pointInfo.substring(0,
+ pointInfo.indexOf(":")));
+
+ String encodedVector = pointInfo
+ .substring(pointInfo.indexOf(":") + 1);
+
+ Double val = pointTotalProbMap.get(encodedVector);
+ double probVal = 0;
+ if (val != null) {
+ probVal = val;
+ }
+
+ pointTotalProbMap.put(encodedVector, probVal + pointProb);
+ }
+ }
+ Iterator<Map.Entry<String, Double>> iterator = pointTotalProbMap
+ .entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, Double> entry = iterator.next();
+ String key = entry.getKey();
+ double value = round(entry.getValue(), 1);
+
+ assertEquals("total Prob for Point:" + key, 1.0, value);
+ }
+ }
+ }
+
+ public void testFuzzyKMeansCombiner() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ SoftCluster.config(measure, 0.001);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+
+ SoftCluster cluster = new SoftCluster(vec);
+ cluster.addPoint(cluster.getCenter(), 1);
+ clusterList.add(cluster);
+ }
+
+ // run mapper
+ FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+ mapper.config(clusterList);
+
+ DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+ for (Vector point : points) {
+ mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+ null);
+ }
+
+ // run combiner
+ DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+ for (String key : mapCollector.getKeys()) {
+
+ List<Text> values = mapCollector.getValue(key);
+ combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+ null);
+ }
+
+ // now verify the combiner output
+ assertEquals("Combiner Output", k + 1, combinerCollector.getData().size());
+
+ for (String key : combinerCollector.getKeys()) {
+ List<Text> values = combinerCollector.getValue(key);
+ assertEquals("too many values", 1, values.size());
+ }
+ }
+ }
+
+ public void testFuzzyKMeansReducer() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ SoftCluster.config(measure, 0.001);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+
+ SoftCluster cluster = new SoftCluster(vec, i);
+ cluster.addPoint(cluster.getCenter(), 1);
+ clusterList.add(cluster);
+ }
+
+ // run mapper
+ FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+ mapper.config(clusterList);
+
+ DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+ for (Vector point : points) {
+ mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+ null);
+ }
+
+ // run combiner
+ DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+ for (String key : mapCollector.getKeys()) {
+
+ List<Text> values = mapCollector.getValue(key);
+ combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+ null);
+ }
+
+ // run reducer
+ DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
+ for (String key : combinerCollector.getKeys()) {
+ List<Text> values = combinerCollector.getValue(key);
+ reducer
+ .reduce(new Text(key), values.iterator(), reducerCollector, null);
+ }
+
+ // now verify the reducer output
+ assertEquals("Reducer Output", k + 1, combinerCollector.getData().size());
+
+ // compute the reference result after one iteration and compare
+ List<SoftCluster> reference = new ArrayList<SoftCluster>();
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+ reference.add(new SoftCluster(vec, i));
+ }
+ iterateReference(points, reference, measure);
+ for (SoftCluster key : reference) {
+ String clusterId = key.getIdentifier();
+ List<Text> values = reducerCollector.getValue(clusterId);
+ SoftCluster cluster = SoftCluster.decodeCluster(values.get(0)
+ .toString());
+ System.out.println("ref= " + key.toString() + " cluster= "
+ + cluster.toString());
+ assertEquals(k + " center[" + key + "][0]", key.getCenter().get(0),
+ cluster.getCenter().get(0));
+ assertEquals(k + " center[" + key + "][1]", key.getCenter().get(1),
+ cluster.getCenter().get(1));
+ }
+ }
+ }
+
+ public void testFuzzyKMeansClusterMapper() throws Exception {
+ List<Vector> points = TestKmeansClustering
+ .getPoints(TestKmeansClustering.reference);
+
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ SoftCluster.config(measure, 0.001);
+
+ for (int k = 0; k < points.size(); k++) {
+ System.out.println("testKFuzzyKMeansMRJob k= " + k);
+ // pick k initial cluster centers at random
+ List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+
+ SoftCluster cluster = new SoftCluster(vec, i);
+ cluster.addPoint(cluster.getCenter(), 1);
+ clusterList.add(cluster);
+ }
+
+ // run mapper
+ FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+ mapper.config(clusterList);
+
+ DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+ for (Vector point : points) {
+ mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+ null);
+ }
+
+ // run combiner
+ DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+ for (String key : mapCollector.getKeys()) {
+
+ List<Text> values = mapCollector.getValue(key);
+ combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+ null);
+ }
+
+ // run reducer
+ DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
+ for (String key : combinerCollector.getKeys()) {
+ List<Text> values = combinerCollector.getValue(key);
+ reducer
+ .reduce(new Text(key), values.iterator(), reducerCollector, null);
+ }
+
+ // run clusterMapper
+ List<SoftCluster> reducerCluster = new ArrayList<SoftCluster>();
+
+ for (String key : reducerCollector.getKeys()) {
+ List<Text> values = reducerCollector.getValue(key);
+ reducerCluster.add(SoftCluster.decodeCluster(values.get(0).toString()));
+ }
+
+ DummyOutputCollector<Text, Text> clusterMapperCollector = new DummyOutputCollector<Text, Text>();
+ FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
+ clusterMapper.config(reducerCluster);
+ for (Vector point : points) {
+ clusterMapper.map(new Text(), new Text(point.asFormatString()),
+ clusterMapperCollector, null);
+ }
+
+ // now run for one iteration of referencefuzzykmeans and compare the
+ // results
+ // compute the reference result after one iteration and compare
+ List<SoftCluster> reference = new ArrayList<SoftCluster>();
+ for (int i = 0; i < k + 1; i++) {
+ Vector vec = tweakValue(points.get(i));
+ reference.add(new SoftCluster(vec, i));
+ }
+ Map<String, String> pointClusterInfo = new HashMap<String, String>();
+ referenceFuzzyKMeans(points, reference, pointClusterInfo,
+ EuclideanDistanceMeasure.class.getName(), 0.001, 1);
+
+ // Now compare the clustermapper results with reducer
+ for (String key : clusterMapperCollector.getKeys()) {
+ List<Text> value = clusterMapperCollector.getValue(key);
+
+ String refValue = pointClusterInfo.get(key);
+ String clusterInfoStr = refValue.substring(1, refValue.length() - 1);
+ String[] refClusterInfoList = clusterInfoStr.split(" ");
+ assertEquals("Number of clusters", k + 1, refClusterInfoList.length);
+ Map<String, Double> refClusterInfoMap = new HashMap<String, Double>();
+ for (String clusterInfo : refClusterInfoList) {
+ String[] clusterProb = clusterInfo.split(":");
+ double clusterProbVal = Double.parseDouble(clusterProb[1]);
+ refClusterInfoMap.put(clusterProb[0], clusterProbVal);
+ }
+
+ String[] clusterInfoList = value.get(0).toString().substring(1,
+ refValue.length() - 1).split(" ");
+ assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+ for (String clusterInfo : refClusterInfoList) {
+ String[] clusterProb = clusterInfo.split(":");
+ double clusterProbVal = Double.parseDouble(clusterProb[1]);
+ assertEquals(k + " point:" + key + ": Cluster:" + clusterProb[0],
+ refClusterInfoMap.get(clusterProb[0]), clusterProbVal);
+ }
+ }
+ }
+ }
+
+}
Propchange: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=688122&r1=688121&r2=688122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Fri Aug 22 10:05:39 2008
@@ -46,10 +46,10 @@
public class TestKmeansClustering extends TestCase {
- static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
+ public static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
{ 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
- static final int[][] expectedNumPoints = { { 9 }, { 4, 5 }, { 4, 5, 0 },
+ public static final int[][] expectedNumPoints = { { 9 }, { 4, 5 }, { 4, 5, 0 },
{ 1, 2, 1, 5 }, { 1, 1, 1, 2, 4 }, { 1, 1, 1, 1, 1, 4 },
{ 1, 1, 1, 1, 1, 2, 2 }, { 1, 1, 1, 1, 1, 1, 2, 1 },
{ 1, 1, 1, 1, 1, 1, 1, 1, 1 } };
@@ -141,7 +141,7 @@
return converged;
}
- private List<Vector> getPoints(double[][] raw) {
+ public static List<Vector> getPoints(double[][] raw) {
List<Vector> points = new ArrayList<Vector>();
for (int i = 0; i < raw.length; i++) {
double[] fr = raw[i];