You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2008/08/22 19:05:40 UTC

svn commit: r688122 - in /lucene/mahout/trunk/core/src: main/java/org/apache/mahout/clustering/fuzzykmeans/ test/java/org/apache/mahout/clustering/fuzzykmeans/ test/java/org/apache/mahout/clustering/kmeans/

Author: gsingers
Date: Fri Aug 22 10:05:39 2008
New Revision: 688122

URL: http://svn.apache.org/viewvc?rev=688122&view=rev
Log:
MAHOUT-74: Fuzzy K Means

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java   (with props)
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java   (with props)
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java   (with props)
Modified:
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,36 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+
+public class FuzzyKMeansClusterMapper extends FuzzyKMeansMapper {
+  public void map(WritableComparable key, Text values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Vector point = AbstractVector.decodeVector(values.toString());
+    SoftCluster.outputPointWithClusterProbabilities(point, clusters, values,
+        output);
+  }
+
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+
+public class FuzzyKMeansCombiner extends MapReduceBase implements
+    Reducer<Text, Text, Text, Text> {
+
+  public void reduce(Text key, Iterator<Text> values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    SoftCluster cluster = SoftCluster.decodeCluster(key.toString());
+    while (values.hasNext()) {
+      String pointInfo = values.next().toString();
+      double pointProb = Double.parseDouble(pointInfo.substring(0, pointInfo
+          .indexOf(":")));
+
+      String encodedVector = pointInfo.substring(pointInfo.indexOf(":") + 1);
+      cluster.addPoint(AbstractVector.decodeVector(encodedVector), pointProb
+          * SoftCluster.getM());
+    }
+    output.collect(key, new Text(cluster.getPointProbSum() + ", "
+        + cluster.getWeightedPointTotal().asFormatString()));
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    SoftCluster.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,233 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class FuzzyKMeansDriver {
+
+  private static final Log log = LogFactory.getLog(FuzzyKMeansDriver.class);
+
+  private FuzzyKMeansDriver() {
+  }
+
+  public static void main(String[] args) {
+    String input = args[0];
+    String clusters = args[1];
+    String output = args[2];
+    String measureClass = args[3];
+    double convergenceDelta = new Double(args[4]);
+    int maxIterations = new Integer(args[5]);
+    int m = new Integer(args[6]);
+    runJob(input, clusters, output, measureClass, convergenceDelta,
+        maxIterations, 10,m);
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input the directory pathname for input points
+   * @param clustersIn the directory pathname for initial & computed clusters
+   * @param output the directory pathname for output points
+   * @param measureClass the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param maxIterations the maximum number of iterations
+   * @param numMapTasks the number of mapper tasks
+   */
+  public static void runJob(String input, String clustersIn, String output,
+      String measureClass, double convergenceDelta, int maxIterations,
+      int numMapTasks, int m) {
+    try {
+      
+      boolean converged = false;
+      int iteration = 0;
+      String delta = Double.toString(convergenceDelta);
+
+      // iterate until the clusters converge
+      while (!converged && iteration < maxIterations) {
+        log.info("Iteration {" + iteration + "}");
+
+        // point the output to a new directory per iteration
+        String clustersOut = output + File.separator + "clusters-" + iteration;
+        converged = runIteration(input, clustersIn, clustersOut, measureClass,
+            delta, numMapTasks, iteration, m);
+
+        // now point the input to the old output directory
+        clustersIn = output + File.separator + "clusters-" + iteration;
+        iteration++;
+      }
+
+      // now actually cluster the points
+      log.info("Clustering ");
+
+      runClustering(input, clustersIn, output + File.separator + "points",
+          measureClass, delta, numMapTasks, m);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input the directory pathname for input points
+   * @param clustersIn the directory pathname for iniput clusters
+   * @param clustersOut the directory pathname for output clusters
+   * @param measureClass the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param numMapTasks the number of map tasks
+   * @param iterationNumber the iteration number that is going to run
+   * @param m
+   * @return true if the iteration successfully runs
+   */
+  private static boolean runIteration(String input, String clustersIn,
+                                      String clustersOut, String measureClass, String convergenceDelta,
+                                      int numMapTasks, int iterationNumber, int m) {
+
+    JobConf conf = new JobConf(FuzzyKMeansJob.class);
+    conf.setJobName("Fuzzy K Means{" + iterationNumber + "}");
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(clustersOut);
+    FileOutputFormat.setOutputPath(conf, outPath);
+
+    conf.setMapperClass(FuzzyKMeansMapper.class);
+    conf.setCombinerClass(FuzzyKMeansCombiner.class);
+    conf.setReducerClass(FuzzyKMeansReducer.class);
+    conf.setNumMapTasks(numMapTasks);
+    conf.setNumReduceTasks(numMapTasks);
+
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.set(SoftCluster.CLUSTER_PATH_KEY, clustersIn);
+    conf.set(SoftCluster.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(SoftCluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+    conf.set(SoftCluster.M_KEY, String.valueOf(m));
+
+    try {
+      JobClient.runJob(conf);
+      FileSystem fs = FileSystem.get(conf);
+      return isConverged(clustersOut, conf, fs);
+    } catch (Exception e) {
+      log.warn(e.toString(), e);
+      return true;
+    }
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input the directory pathname for input points
+   * @param clustersIn the directory pathname for input clusters
+   * @param output the directory pathname for output points
+   * @param measureClass the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param numMapTasks the number of map tasks
+   */
+  private static void runClustering(String input, String clustersIn,
+      String output, String measureClass, String convergenceDelta,
+      int numMapTasks, float m) {
+
+    JobConf conf = new JobConf(FuzzyKMeansDriver.class);
+    conf.setJobName("Fuzzy K Means Clustering");
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, new Path(input));
+    Path outPath = new Path(output);
+    FileOutputFormat.setOutputPath(conf, outPath);
+
+    conf.setMapperClass(FuzzyKMeansClusterMapper.class);
+
+    // uncomment it to run locally
+    // conf.set("mapred.job.tracker", "local");
+    conf.setNumMapTasks(numMapTasks);
+    conf.setNumReduceTasks(0);
+    conf.set(SoftCluster.CLUSTER_PATH_KEY, clustersIn);
+    conf.set(SoftCluster.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(SoftCluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+    conf.set(SoftCluster.M_KEY, String.valueOf(m));
+    try {
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      log.warn(e.toString(), e);
+    }
+  }
+
+  /**
+   * Return if all of the Clusters in the filePath have converged or not
+   * 
+   * @param filePath the file path to the single file containing the clusters
+   * @param conf the JobConf
+   * @param fs the FileSystem
+   * @return true if all Clusters are converged
+   * @throws IOException if there was an IO error
+   */
+  private static boolean isConverged(String filePath, JobConf conf,
+      FileSystem fs) throws IOException {
+
+    Path clusterPath = new Path(filePath);
+    List<Path> result = new ArrayList<Path>();
+
+    PathFilter clusterFileFilter = new PathFilter() {
+      public boolean accept(Path path) {
+        return path.getName().startsWith("part");
+      }
+    };
+
+    FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(
+        clusterPath, clusterFileFilter)), clusterFileFilter);
+
+    for (FileStatus match : matches) {
+      result.add(fs.makeQualified(match.getPath()));
+    }
+    boolean converged = true;
+
+    for (Path p : result) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+      Text key = new Text();
+      Text value = new Text();
+
+      while (converged && reader.next(key, value)) {
+        converged = value.toString().startsWith("V");
+      }
+    }
+
+    return converged;
+  }
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.utils.ManhattanDistanceMeasure;
+
+public class FuzzyKMeansJob {
+
+  public static void main(String[] args) throws IOException {
+
+    if (args.length != 9) {
+      System.out.println("Expected num Arguments: 9  received:" + args.length);
+      printMessage();
+    }
+    int index = 0;
+    String input = args[index++];
+    String clusters = args[index++];
+    String output = args[index++];
+     String measureClass = args[index++];
+    double convergenceDelta = new Double(args[index++]);
+    int maxIterations = new Integer(args[index++]);
+    int numMapTasks = Integer.parseInt(args[index++]);
+    boolean doCanopy = Boolean.parseBoolean(args[index++]);
+    int m = Integer.parseInt(args[index++]);
+
+    runJob(input, clusters, output,
+        measureClass, convergenceDelta,
+        maxIterations, numMapTasks, doCanopy,m);
+  }
+
+  /**
+   * Prints Error Message
+   */
+  private static void printMessage() {
+    System.out
+        .println("Usage: inputDir clusterDir OutputDir ConvergenceDelata  maxIterations numMapTasks doCanopy");
+    System.exit(1);
+  }
+
+  /**
+   * Run the job using supplied arguments
+   * 
+   * @param input the directory pathname for input points
+   * @param clustersIn the directory pathname for initial clusters
+   * @param output the directory pathname for output points
+   * @param measureClass the classname of the DistanceMeasure
+   * @param convergenceDelta the convergence delta value
+   * @param maxIterations the maximum number of iterations
+   * @param numMapTasks the number of maptasks
+   * @doCanopy does canopy needed for initial clusters
+   * @m param needed to fuzzify the cluster membership values
+   */
+  public static void runJob(String input, String clustersIn, String output,
+      String measureClass, double convergenceDelta, int maxIterations,
+      int numMapTasks, boolean doCanopy, int m) {
+    try {
+
+      // run canopy to find initial clusters
+      if (doCanopy) {
+        CanopyDriver.runJob(input, clustersIn, ManhattanDistanceMeasure.class
+            .getName(), 100.1, 50.1);
+
+      }
+      // run fuzzy k -means
+      FuzzyKMeansDriver.runJob(input, clustersIn, output, measureClass,
+          convergenceDelta, maxIterations, numMapTasks,m);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,137 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FuzzyKMeansMapper extends MapReduceBase implements
+    Mapper<WritableComparable, Text, Text, Text> {
+
+  private static final Logger log = LoggerFactory
+      .getLogger(FuzzyKMeansMapper.class);
+
+  protected List<SoftCluster> clusters;
+
+  public void map(WritableComparable key, Text values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Vector point = AbstractVector.decodeVector(values.toString());
+    SoftCluster.emitPointProbToCluster(point, clusters, values, output);
+  }
+
+  /**
+   * Configure the mapper by providing its clusters. Used by unit tests.
+   * 
+   * @param clusters a List<Cluster>
+   */
+  void config(List<SoftCluster> clusters) {
+    this.clusters = clusters;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  @Override
+  public void configure(JobConf job) {
+
+    super.configure(job);
+    SoftCluster.configure(job);
+
+    log.info("In Mapper Configure:");
+    clusters = new ArrayList<SoftCluster>();
+
+    configureWithClusterInfo(job);
+
+    if (clusters.size() == 0)
+      throw new NullPointerException("Cluster is empty!!!");
+  }
+
+  /**
+   * Configure the mapper with the cluster info
+   * 
+   * @param job
+   * @param clusters
+   */
+  protected void configureWithClusterInfo(JobConf job) {
+    // Get the path location where the cluster Info is stored
+    String clusterPathStr = job.get(SoftCluster.CLUSTER_PATH_KEY);
+    Path clusterPath = new Path(clusterPathStr);
+    List<Path> result = new ArrayList<Path>();
+
+    // filter out the files
+    PathFilter clusterFileFilter = new PathFilter() {
+      public boolean accept(Path path) {
+        return path.getName().startsWith("part");
+      }
+    };
+
+    try {
+      // get all filtered file names in result list
+      FileSystem fs = clusterPath.getFileSystem(job);
+      FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(
+          clusterPath, clusterFileFilter)), clusterFileFilter);
+
+      for (FileStatus match : matches) {
+        result.add(fs.makeQualified(match.getPath()));
+      }
+
+      // iterate thru the result path list
+      for (Path path : result) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+        try {
+          Text key = new Text();
+          Text value = new Text();
+          int counter = 1;
+          while (reader.next(key, value)) {
+            // get the cluster info
+            SoftCluster cluster = SoftCluster.decodeCluster(value.toString());
+            // add the center so the centroid will be correct on output
+            // formatting
+            cluster.addPoint(cluster.getCenter(), 1);
+            clusters.add(cluster);
+          }
+        } finally {
+          reader.close();
+        }
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
+
+public class FuzzyKMeansReducer extends MapReduceBase implements
+    Reducer<Text, Text, Text, Text> {
+
+  public void reduce(Text key, Iterator<Text> values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    SoftCluster cluster = SoftCluster.decodeCluster(key.toString());
+    while (values.hasNext()) {
+      String value = values.next().toString();
+
+      int ix = value.indexOf(',');
+      try {
+        double partialSumPtProb = new Double(value.substring(0, ix));
+        Vector total = AbstractVector.decodeVector(value.substring(ix + 2));
+        cluster.addPoints(partialSumPtProb, total);
+      } catch (Exception e) {
+        // Escaped from Combiner. So, let's do that processing too:
+        System.out.println("Escaped from combiner: Key:" + key.toString()
+            + " Value:" + value);
+        double pointProb = Double.parseDouble(value.substring(0, value
+            .indexOf(":")));
+
+        String encodedVector = value.substring(value.indexOf(":") + 1);
+        cluster.addPoint(AbstractVector.decodeVector(encodedVector), pointProb
+            * SoftCluster.getM());
+      }
+    }
+
+    // force convergence calculation
+    cluster.computeConvergence();
+    output.collect(new Text(cluster.getIdentifier()), new Text(SoftCluster
+        .formatCluster(cluster)));
+  }
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+    SoftCluster.configure(job);
+  }
+
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,354 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+
+public class SoftCluster {
+
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
+
+  public static final String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path";
+
+  public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+
+  public static final String M_KEY = "org.apache.mahout.clustering.fuzzykmeans.m";
+
+  private static float m = 2; //default value
+  public static final double minimalValue = 0.0000000001; // using it for adding
+
+  // exception
+  // this value to any
+  // zero valued
+  // variable to avoid
+  // divide by Zero
+
+  private static int nextClusterId = 0;
+
+  // this cluster's clusterId
+  private int clusterId;
+
+  // the current center
+  private Vector center = new SparseVector(0);
+
+  // the current centroid is lazy evaluated and may be null
+  private Vector centroid = null;
+
+  // The Probability of belongingness sum
+  private double pointProbSum = 0.0;
+
+  // the total of all points added to the cluster
+  private Vector weightedPointTotal = null;
+
+  // has the centroid converged with the center?
+  private boolean converged = false;
+
+  private static DistanceMeasure measure;
+  private static double convergenceDelta = 0;
+
+  /**
+   * Format the SoftCluster for output
+   * 
+   * @param cluster the Cluster
+   * @return
+   */
+  public static String formatCluster(SoftCluster cluster) {
+    return cluster.getIdentifier() + ": "
+        + cluster.computeCentroid().asFormatString();
+  }
+
+  /**
+   * Decodes and returns a SoftCluster from the formattedString
+   * 
+   * @param formattedString a String produced by formatCluster
+   * @return
+   */
+  public static SoftCluster decodeCluster(String formattedString) {
+    int beginIndex = formattedString.indexOf('[');
+    String id = formattedString.substring(0, beginIndex);
+    String center = formattedString.substring(beginIndex);
+    if (id.startsWith("C") || id.startsWith("V")) {
+      int clusterId = new Integer(formattedString.substring(1, beginIndex - 2));
+      Vector clusterCenter = null;
+
+      clusterCenter = AbstractVector.decodeVector(center);
+
+      SoftCluster cluster = new SoftCluster(clusterCenter, clusterId);
+      cluster.converged = id.startsWith("V");
+      return cluster;
+    }
+    return null;
+  }
+
+  /**
+   * Configure the distance measure from the job
+   * 
+   * @param job the JobConf for the job
+   */
+  public static void configure(JobConf job) {
+    try {
+      final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      Class<?> cl = ccl.loadClass(job.get(DISTANCE_MEASURE_KEY));
+      measure = (DistanceMeasure) cl.newInstance();
+      measure.configure(job);
+      convergenceDelta = new Double(job.get(CLUSTER_CONVERGENCE_KEY));
+      nextClusterId = 0;
+      m = Float.parseFloat(job.get(M_KEY)); 
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Configure the distance measure directly. Used by unit tests.
+   * 
+   * @param aMeasure the DistanceMeasure
+   * @param aConvergenceDelta the delta value used to define convergence
+   */
+  public static void config(DistanceMeasure aMeasure, double aConvergenceDelta) {
+    measure = aMeasure;
+    convergenceDelta = aConvergenceDelta;
+    nextClusterId = 0;
+  }
+
+  /**
+   * Emit the point and its probability of belongingness to each cluster
+   * 
+   * @param point a point
+   * @param clusters a List<SoftCluster>
+   * @param values a Writable containing the input point and possible other
+   *        values of interest (payload)
+   * @param output the OutputCollector to emit into
+   * @throws IOException
+   */
+  public static void emitPointProbToCluster(Vector point,
+      List<SoftCluster> clusters, Text values,
+      OutputCollector<Text, Text> output) throws IOException {
+    List<Double> clusterDistanceList = new ArrayList<Double>();
+    for (SoftCluster cluster : clusters) {
+      clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+    }
+
+    for (int i = 0; i < clusters.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i),
+          clusterDistanceList);
+
+      Text key = new Text(formatCluster(clusters.get(i)));
+      Text value = new Text(Double.toString(probWeight) + ":"
+          + values.toString());
+      output.collect(key, value);
+    }
+  }
+
+  /**
+   * Output point with cluster info (Cluster and probability)
+   * 
+   * @param point a point
+   * @param clusters a List<SoftCluster> to test
+   * @param values a Writable containing the input point and possible other
+   *        values of interest (payload)
+   * @param output the OutputCollector to emit into
+   * @throws IOException
+   */
+  public static void outputPointWithClusterProbabilities(Vector point,
+      List<SoftCluster> clusters, Text values,
+      OutputCollector<Text, Text> output) throws IOException {
+
+    String outputKey = values.toString();
+    StringBuffer outputValue = new StringBuffer("[");
+    List<Double> clusterDistanceList = new ArrayList<Double>();
+
+    for (SoftCluster cluster : clusters) {
+      clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+    }
+
+    for (int i = 0; i < clusters.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i),
+          clusterDistanceList);
+      outputValue.append(clusters.get(i).clusterId).append(":").append(
+          probWeight).append(" ");
+    }
+    output.collect(new Text(outputKey.trim()), new Text(outputValue.toString()
+        .trim()
+        + "]"));
+  }
+
+  /**
+   * Computes the probability of a point belonging to a cluster
+   * 
+   * @param clusterDistance
+   * @param clusterDistanceList
+   * @return
+   */
+  public static double computeProbWeight(double clusterDistance,
+      List<Double> clusterDistanceList) {
+    double denom = 0.0;
+    if (clusterDistance == 0) {
+      clusterDistance = minimalValue;
+    }
+    for (Double eachCDist : clusterDistanceList) {
+      if (eachCDist == 0)
+        eachCDist = minimalValue;
+
+      denom += Math.pow(clusterDistance / eachCDist, (double) 2 / (m - 1));
+
+    }
+    double val = (double) (1) / denom;
+    return val;
+  }
+
+  /**
+   * Compute the centroid
+   * 
+   * @return the new centroid
+   */
+  private Vector computeCentroid() {
+    if (pointProbSum == 0)
+      return weightedPointTotal;
+    else if (centroid == null) {
+      // lazy compute new centroid
+      centroid = weightedPointTotal.divide(pointProbSum);
+    }
+    return centroid;
+  }
+
+  /**
+   * Construct a new SoftCluster with the given point as its center
+   * 
+   * @param center the center point
+   */
+  public SoftCluster(Vector center) {
+    super();
+    this.clusterId = nextClusterId++;
+    this.center = center;
+    this.pointProbSum = 0;
+
+    this.weightedPointTotal = center.like();
+  }
+
+  /**
+   * Construct a new SoftCluster with the given point as its center
+   * 
+   * @param center the center point
+   */
+  public SoftCluster(Vector center, int clusterId) {
+    super();
+    this.clusterId = clusterId;
+    this.center = center;
+    this.pointProbSum = 0;
+    this.weightedPointTotal = center.like();
+  }
+
+  @Override
+  public String toString() {
+    return getIdentifier() + " - " + center.asFormatString();
+  }
+
+  public String getIdentifier() {
+    if (converged)
+      return "V" + clusterId;
+    else
+      return "C" + clusterId;
+  }
+
+  /**
+   * Add the point to the SoftCluster
+   * 
+   * @param point a point to add
+   * @param ptProb
+   */
+  public void addPoint(Vector point, double ptProb) {
+    centroid = null;
+    pointProbSum += ptProb;
+    if (weightedPointTotal == null)
+      weightedPointTotal = point.copy().times(ptProb);
+    else
+      weightedPointTotal = point.times(ptProb).plus(weightedPointTotal);
+  }
+
+  /**
+   * Add the point to the SoftCluster
+   *
+   * @param partialSumPtProb
+   * @param delta a point to add
+   */
+  public void addPoints(double partialSumPtProb, Vector delta) {
+    centroid = null;
+    pointProbSum += partialSumPtProb;
+    if (weightedPointTotal == null)
+      weightedPointTotal = delta.copy();
+    else
+      weightedPointTotal = delta.plus(weightedPointTotal);
+  }
+
+  public Vector getCenter() {
+    return center;
+  }
+
+  public double getPointProbSum() {
+    return pointProbSum;
+  }
+
+  /**
+   * Compute the centroid and set the center to it.
+   */
+  public void recomputeCenter() {
+    center = computeCentroid();
+    pointProbSum = 0;
+    weightedPointTotal = center.like();
+  }
+
+  /**
+   * Return if the cluster is converged by comparing its center and centroid.
+   * 
+   * @return if the cluster is converged
+   */
+  public boolean computeConvergence() {
+    Vector centroid = computeCentroid();
+    converged = measure.distance(centroid, center) <= convergenceDelta;
+    return converged;
+  }
+
+  public Vector getWeightedPointTotal() {
+    return weightedPointTotal;
+  }
+
+  public boolean isConverged() {
+    return converged;
+  }
+
+  public int getClusterId() {
+    return clusterId;
+  }
+
+  public static float getM() {
+    return m;
+  }
+
+
+
+}

Propchange: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=688122&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Fri Aug 22 10:05:39 2008
@@ -0,0 +1,566 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.utils.DummyOutputCollector;
+import org.apache.mahout.utils.EuclideanDistanceMeasure;
+
+public class TestFuzzyKmeansClustering extends TestCase {
+
+  private void rmr(String path) throws Exception {
+    File f = new File(path);
+    if (f.exists()) {
+      if (f.isDirectory()) {
+        String[] contents = f.list();
+        for (int i = 0; i < contents.length; i++)
+          rmr(f.toString() + File.separator + contents[i]);
+      }
+      f.delete();
+    }
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    rmr("output");
+    rmr("testdata");
+  }
+
+  public static double round(double val, int places) {
+    long factor = (long) Math.pow(10, places);
+
+    // Shift the decimal the correct number of places
+    // to the right.
+    val = val * factor;
+
+    // Round to the nearest integer.
+    long tmp = Math.round(val);
+
+    // Shift the decimal the correct number of places
+    // back to the left.
+    return (double) tmp / factor;
+  }
+
+  public static Vector tweakValue(Vector point) {
+    return point.plus(0.1);
+
+  }
+
+  public void referenceFuzzyKMeans(List<Vector> points,
+      List<SoftCluster> clusterList, Map<String, String> pointClusterInfo,
+      String distanceMeasureClass, double threshold, int numIter)
+      throws Exception {
+    final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+    Class<?> cl = ccl.loadClass(distanceMeasureClass);
+
+    DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
+    SoftCluster.config(measure, threshold);
+    boolean converged = false;
+    for (int iter = 0; !converged && iter < numIter; iter++) {
+      iterateReference(points, clusterList, measure);
+    }
+    computeCluster(points, clusterList, measure, pointClusterInfo);
+  }
+
+  public void iterateReference(List<Vector> points,
+      List<SoftCluster> clusterList, DistanceMeasure measure) {
+    // for each
+    for (Vector point : points) {
+      List<Double> clusterDistanceList = new ArrayList<Double>();
+      for (SoftCluster cluster : clusterList) {
+        clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+      }
+
+      for (int i = 0; i < clusterList.size(); i++) {
+        double probWeight = SoftCluster.computeProbWeight(clusterDistanceList
+            .get(i), clusterDistanceList);
+        clusterList.get(i).addPoint(point, probWeight * SoftCluster.getM());
+      }
+    }
+    for (SoftCluster cluster : clusterList) {
+      cluster.computeConvergence();
+      cluster.recomputeCenter();
+    }
+  }
+
+  public void computeCluster(List<Vector> points,
+      List<SoftCluster> clusterList, DistanceMeasure measure,
+      Map<String, String> pointClusterInfo) {
+
+    for (Vector point : points) {
+      StringBuffer outputValue = new StringBuffer("[");
+      List<Double> clusterDistanceList = new ArrayList<Double>();
+      for (SoftCluster cluster : clusterList) {
+        clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
+      }
+      for (int i = 0; i < clusterList.size(); i++) {
+        double probWeight = SoftCluster.computeProbWeight(clusterDistanceList
+            .get(i), clusterDistanceList);
+        outputValue.append(clusterList.get(i).getClusterId()).append(":")
+            .append(probWeight).append(" ");
+      }
+
+      pointClusterInfo.put(point.asFormatString().trim(), outputValue
+          .toString().trim()
+          + "]");
+    }
+  }
+
+  public void testReferenceImplementation() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("test k= " + k);
+
+      List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+      // pick k initial cluster centers at random
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+        SoftCluster cluster = new SoftCluster(vec);
+        // add the center so the centroid will be correct upon output
+        cluster.addPoint(cluster.getCenter(), 1);
+
+        clusterList.add(cluster);
+      }
+      Map<String, String> pointClusterInfo = new HashMap<String, String>();
+      // run reference FuzzyKmeans algorithm
+      referenceFuzzyKMeans(points, clusterList, pointClusterInfo,
+          EuclideanDistanceMeasure.class.getName(), 0.001, 2);
+      Iterator<Map.Entry<String, String>> iterator = pointClusterInfo
+          .entrySet().iterator();
+
+      // iterate for each point
+      while (iterator.hasNext()) {
+        Map.Entry<String, String> entry = iterator.next();
+        String value = entry.getValue();
+
+        String clusterInfoStr = value.substring(1, value.length() - 1);
+        String[] clusterInfoList = clusterInfoStr.split(" ");
+        assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+        double prob = 0.0;
+        for (String clusterInfo : clusterInfoList) {
+          String[] clusterProb = clusterInfo.split(":");
+
+          double clusterProbVal = Double.parseDouble(clusterProb[1]);
+          prob += clusterProbVal;
+        }
+        prob = round(prob, 1);
+        assertEquals(
+            "Sum of cluster Membership problability should be equal to=", 1.0,
+            prob);
+      }
+    }
+  }
+
+  public void testFuzzyKMeansMRJob() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+    File testData = new File("testdata");
+    if (!testData.exists())
+      testData.mkdir();
+    testData = new File("testdata/points");
+    if (!testData.exists())
+      testData.mkdir();
+
+    TestKmeansClustering.writePointsToFile(points, "testdata/points/file1");
+    TestKmeansClustering.writePointsToFile(points, "testdata/points/file2");
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      JobConf job = new JobConf(FuzzyKMeansDriver.class);
+      FileSystem fs = FileSystem.get(job);
+      Path path = new Path("testdata/clusters/part-00000");
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path,
+          Text.class, Text.class);
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+
+        SoftCluster cluster = new SoftCluster(vec);
+        // add the center so the centroid will be correct upon output
+        cluster.addPoint(cluster.getCenter(), 1);
+        writer.append(new Text(cluster.getIdentifier()), new Text(SoftCluster
+            .formatCluster(cluster)));
+      }
+      writer.close();
+
+      JobConf conf = new JobConf(FuzzyKMeansDriver.class);
+      Path outPath = new Path("output");
+      fs = FileSystem.get(conf);
+      if (fs.exists(outPath)) {
+        fs.delete(outPath, true);
+      }
+      fs.mkdirs(outPath);
+      // now run the Job
+      FuzzyKMeansDriver.runJob("testdata/points", "testdata/clusters",
+          "output", EuclideanDistanceMeasure.class.getName(), 0.001, 2, 1,2);
+
+      // now compare the expected clusters with actual
+      File outDir = new File("output/points");
+      assertTrue("output dir exists?", outDir.exists());
+      String[] outFiles = outDir.list();
+      assertEquals("output dir files?", 4, outFiles.length);
+      BufferedReader reader = new BufferedReader(new FileReader(
+          "output/points/part-00000"));
+
+      while (reader.ready()) {
+        String line = reader.readLine();
+        String[] lineParts = line.split("\t");
+        assertEquals("line parts", 2, lineParts.length);
+        String clusterInfoStr = lineParts[1].substring(1,
+            lineParts[1].length() - 1);
+        String[] clusterInfoList = clusterInfoStr.split(" ");
+        assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+        double prob = 0.0;
+        for (String clusterInfo : clusterInfoList) {
+          String[] clusterProb = clusterInfo.split(":");
+
+          double clusterProbVal = Double.parseDouble(clusterProb[1]);
+          prob += clusterProbVal;
+        }
+        prob = round(prob, 1);
+        assertEquals(
+            "Sum of cluster Membership problability should be equal to=", 1.0,
+            prob);
+      }
+
+      reader.close();
+
+    }
+
+  }
+
+  public void testFuzzyKMeansMapper() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    SoftCluster.config(measure, 0.001);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+
+        SoftCluster cluster = new SoftCluster(vec);
+        cluster.addPoint(cluster.getCenter(), 1);
+        clusterList.add(cluster);
+      }
+
+      // run mapper
+      FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+      mapper.config(clusterList);
+
+      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      for (Vector point : points) {
+        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+            null);
+      }
+
+      // now verify mapper output
+      assertEquals("Mapper Keys", k + 1, mapCollector.getData().size());
+
+      Map<String, Double> pointTotalProbMap = new HashMap<String, Double>();
+
+      for (String key : mapCollector.getKeys()) {
+        SoftCluster cluster = SoftCluster.decodeCluster(key);
+        List<Text> values = mapCollector.getValue(key);
+
+        for (Text value : values) {
+          String pointInfo = value.toString();
+          double pointProb = Double.parseDouble(pointInfo.substring(0,
+              pointInfo.indexOf(":")));
+
+          String encodedVector = pointInfo
+              .substring(pointInfo.indexOf(":") + 1);
+
+          Double val = pointTotalProbMap.get(encodedVector);
+          double probVal = 0;
+          if (val != null) {
+            probVal = val;
+          }
+
+          pointTotalProbMap.put(encodedVector, probVal + pointProb);
+        }
+      }
+      Iterator<Map.Entry<String, Double>> iterator = pointTotalProbMap
+          .entrySet().iterator();
+
+      while (iterator.hasNext()) {
+        Map.Entry<String, Double> entry = iterator.next();
+        String key = entry.getKey();
+        double value = round(entry.getValue(), 1);
+
+        assertEquals("total Prob for Point:" + key, 1.0, value);
+      }
+    }
+  }
+
+  public void testFuzzyKMeansCombiner() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    SoftCluster.config(measure, 0.001);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+
+        SoftCluster cluster = new SoftCluster(vec);
+        cluster.addPoint(cluster.getCenter(), 1);
+        clusterList.add(cluster);
+      }
+
+      // run mapper
+      FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+      mapper.config(clusterList);
+
+      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      for (Vector point : points) {
+        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+            null);
+      }
+
+      // run combiner
+      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+      for (String key : mapCollector.getKeys()) {
+
+        List<Text> values = mapCollector.getValue(key);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+            null);
+      }
+
+      // now verify the combiner output
+      assertEquals("Combiner Output", k + 1, combinerCollector.getData().size());
+
+      for (String key : combinerCollector.getKeys()) {
+        List<Text> values = combinerCollector.getValue(key);
+        assertEquals("too many values", 1, values.size());
+      }
+    }
+  }
+
+  public void testFuzzyKMeansReducer() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    SoftCluster.config(measure, 0.001);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+
+        SoftCluster cluster = new SoftCluster(vec, i);
+        cluster.addPoint(cluster.getCenter(), 1);
+        clusterList.add(cluster);
+      }
+
+      // run mapper
+      FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+      mapper.config(clusterList);
+
+      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      for (Vector point : points) {
+        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+            null);
+      }
+
+      // run combiner
+      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+      for (String key : mapCollector.getKeys()) {
+
+        List<Text> values = mapCollector.getValue(key);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+            null);
+      }
+
+      // run reducer
+      DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
+      for (String key : combinerCollector.getKeys()) {
+        List<Text> values = combinerCollector.getValue(key);
+        reducer
+            .reduce(new Text(key), values.iterator(), reducerCollector, null);
+      }
+
+      // now verify the reducer output
+      assertEquals("Reducer Output", k + 1, combinerCollector.getData().size());
+
+      // compute the reference result after one iteration and compare
+      List<SoftCluster> reference = new ArrayList<SoftCluster>();
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+        reference.add(new SoftCluster(vec, i));
+      }
+      iterateReference(points, reference, measure);
+      for (SoftCluster key : reference) {
+        String clusterId = key.getIdentifier();
+        List<Text> values = reducerCollector.getValue(clusterId);
+        SoftCluster cluster = SoftCluster.decodeCluster(values.get(0)
+            .toString());
+        System.out.println("ref= " + key.toString() + " cluster= "
+            + cluster.toString());
+        assertEquals(k + " center[" + key + "][0]", key.getCenter().get(0),
+            cluster.getCenter().get(0));
+        assertEquals(k + " center[" + key + "][1]", key.getCenter().get(1),
+            cluster.getCenter().get(1));
+      }
+    }
+  }
+
+  public void testFuzzyKMeansClusterMapper() throws Exception {
+    List<Vector> points = TestKmeansClustering
+        .getPoints(TestKmeansClustering.reference);
+
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    SoftCluster.config(measure, 0.001);
+
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      List<SoftCluster> clusterList = new ArrayList<SoftCluster>();
+
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+
+        SoftCluster cluster = new SoftCluster(vec, i);
+        cluster.addPoint(cluster.getCenter(), 1);
+        clusterList.add(cluster);
+      }
+
+      // run mapper
+      FuzzyKMeansMapper mapper = new FuzzyKMeansMapper();
+      mapper.config(clusterList);
+
+      DummyOutputCollector<Text, Text> mapCollector = new DummyOutputCollector<Text, Text>();
+      for (Vector point : points) {
+        mapper.map(new Text(), new Text(point.asFormatString()), mapCollector,
+            null);
+      }
+
+      // run combiner
+      DummyOutputCollector<Text, Text> combinerCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansCombiner combiner = new FuzzyKMeansCombiner();
+
+      for (String key : mapCollector.getKeys()) {
+
+        List<Text> values = mapCollector.getValue(key);
+        combiner.reduce(new Text(key), values.iterator(), combinerCollector,
+            null);
+      }
+
+      // run reducer
+      DummyOutputCollector<Text, Text> reducerCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansReducer reducer = new FuzzyKMeansReducer();
+      for (String key : combinerCollector.getKeys()) {
+        List<Text> values = combinerCollector.getValue(key);
+        reducer
+            .reduce(new Text(key), values.iterator(), reducerCollector, null);
+      }
+
+      // run clusterMapper
+      List<SoftCluster> reducerCluster = new ArrayList<SoftCluster>();
+
+      for (String key : reducerCollector.getKeys()) {
+        List<Text> values = reducerCollector.getValue(key);
+        reducerCluster.add(SoftCluster.decodeCluster(values.get(0).toString()));
+      }
+
+      DummyOutputCollector<Text, Text> clusterMapperCollector = new DummyOutputCollector<Text, Text>();
+      FuzzyKMeansClusterMapper clusterMapper = new FuzzyKMeansClusterMapper();
+      clusterMapper.config(reducerCluster);
+      for (Vector point : points) {
+        clusterMapper.map(new Text(), new Text(point.asFormatString()),
+            clusterMapperCollector, null);
+      }
+
+      // now run for one iteration of referencefuzzykmeans and compare the
+      // results
+      // compute the reference result after one iteration and compare
+      List<SoftCluster> reference = new ArrayList<SoftCluster>();
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i));
+        reference.add(new SoftCluster(vec, i));
+      }
+      Map<String, String> pointClusterInfo = new HashMap<String, String>();
+      referenceFuzzyKMeans(points, reference, pointClusterInfo,
+          EuclideanDistanceMeasure.class.getName(), 0.001, 1);
+
+      // Now compare the clustermapper results with reducer
+      for (String key : clusterMapperCollector.getKeys()) {
+        List<Text> value = clusterMapperCollector.getValue(key);
+
+        String refValue = pointClusterInfo.get(key);
+        String clusterInfoStr = refValue.substring(1, refValue.length() - 1);
+        String[] refClusterInfoList = clusterInfoStr.split(" ");
+        assertEquals("Number of clusters", k + 1, refClusterInfoList.length);
+        Map<String, Double> refClusterInfoMap = new HashMap<String, Double>();
+        for (String clusterInfo : refClusterInfoList) {
+          String[] clusterProb = clusterInfo.split(":");
+          double clusterProbVal = Double.parseDouble(clusterProb[1]);
+          refClusterInfoMap.put(clusterProb[0], clusterProbVal);
+        }
+
+        String[] clusterInfoList = value.get(0).toString().substring(1,
+            refValue.length() - 1).split(" ");
+        assertEquals("Number of clusters", k + 1, clusterInfoList.length);
+        for (String clusterInfo : refClusterInfoList) {
+          String[] clusterProb = clusterInfo.split(":");
+          double clusterProbVal = Double.parseDouble(clusterProb[1]);
+          assertEquals(k + " point:" + key + ": Cluster:" + clusterProb[0],
+              refClusterInfoMap.get(clusterProb[0]), clusterProbVal);
+        }
+      }
+    }
+  }
+
+}

Propchange: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=688122&r1=688121&r2=688122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Fri Aug 22 10:05:39 2008
@@ -46,10 +46,10 @@
 
 public class TestKmeansClustering extends TestCase {
 
-  static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
+  public static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 },
       { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } };
 
-  static final int[][] expectedNumPoints = { { 9 }, { 4, 5 }, { 4, 5, 0 },
+  public static final int[][] expectedNumPoints = { { 9 }, { 4, 5 }, { 4, 5, 0 },
       { 1, 2, 1, 5 }, { 1, 1, 1, 2, 4 }, { 1, 1, 1, 1, 1, 4 },
       { 1, 1, 1, 1, 1, 2, 2 }, { 1, 1, 1, 1, 1, 1, 2, 1 },
       { 1, 1, 1, 1, 1, 1, 1, 1, 1 } };
@@ -141,7 +141,7 @@
     return converged;
   }
 
-  private List<Vector> getPoints(double[][] raw) {
+  public static List<Vector> getPoints(double[][] raw) {
     List<Vector> points = new ArrayList<Vector>();
     for (int i = 0; i < raw.length; i++) {
       double[] fr = raw[i];