You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:08:01 UTC

[30/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
new file mode 100644
index 0000000..930fd44
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.mahout.clustering.iterator.DistanceMeasureCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+
+/**
+ * This class models a canopy as a center point, the number of points that are contained within it according
+ * to the application of some distance metric, and a point total which is the sum of all the points and is
+ * used to compute the centroid when needed.
+ */
+@Deprecated
+public class Canopy extends DistanceMeasureCluster {
+  
+  /** Used for deserialization as a writable */
+  public Canopy() { }
+  
+  /**
+   * Create a new Canopy containing the given point and canopyId
+   * 
+   * @param center a point in vector space
+   * @param canopyId an int identifying the canopy local to this process only
+   * @param measure a DistanceMeasure to use
+   */
+  public Canopy(Vector center, int canopyId, DistanceMeasure measure) {
+    super(center, canopyId, measure);
+    observe(center);
+  }
+
+  public String asFormatString() {
+    return "C" + this.getId() + ": " + this.computeCentroid().asFormatString();
+  }
+
+  @Override
+  public String toString() {
+    return getIdentifier() + ": " + getCenter().asFormatString();
+  }
+  
+  @Override
+  public String getIdentifier() {
+    return "C-" + getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
new file mode 100644
index 0000000..3ce4757
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+@Deprecated
+public class CanopyClusterer {
+
+  private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class);
+
+  private int nextCanopyId;
+
+  // the T1 distance threshold
+  private double t1;
+
+  // the T2 distance threshold
+  private double t2;
+
+  // the T3 distance threshold
+  private double t3;
+
+  // the T4 distance threshold
+  private double t4;
+
+  // the distance measure
+  private DistanceMeasure measure;
+
+  public CanopyClusterer(DistanceMeasure measure, double t1, double t2) {
+    this.t1 = t1;
+    this.t2 = t2;
+    this.t3 = t1;
+    this.t4 = t2;
+    this.measure = measure;
+  }
+
+  public double getT1() {
+    return t1;
+  }
+
+  public double getT2() {
+    return t2;
+  }
+
+  public double getT3() {
+    return t3;
+  }
+
+  public double getT4() {
+    return t4;
+  }
+
+  /**
+   * Used by CanopyReducer to set t1=t3 and t2=t4 configuration values
+   */
+  public void useT3T4() {
+    t1 = t3;
+    t2 = t4;
+  }
+
+  /**
+   * This is the same algorithm as the reference but inverted to iterate over
+   * existing canopies instead of the points. Because of this it does not need
+   * to actually store the points, instead storing a total points vector and
+   * the number of points. From this a centroid can be computed.
+   * <p/>
+   * This method is used by the CanopyMapper, CanopyReducer and CanopyDriver.
+   * 
+   * @param point
+   *            the point to be added
+   * @param canopies
+   *            the List<Canopy> to be appended
+   */
+  public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
+    boolean pointStronglyBound = false;
+    for (Canopy canopy : canopies) {
+      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
+      if (dist < t1) {
+        if (log.isDebugEnabled()) {
+          log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier());
+        }
+        canopy.observe(point);
+      }
+      pointStronglyBound = pointStronglyBound || dist < t2;
+    }
+    if (!pointStronglyBound) {
+      if (log.isDebugEnabled()) {
+        log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null));
+      }
+      canopies.add(new Canopy(point, nextCanopyId++, measure));
+    }
+  }
+
+  /**
+   * Return if the point is covered by the canopy
+   * 
+   * @param point
+   *            a point
+   * @return if the point is covered
+   */
+  public boolean canopyCovers(Canopy canopy, Vector point) {
+    return measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point) < t1;
+  }
+
+  /**
+   * Iterate through the points, adding new canopies. Return the canopies.
+   * 
+   * @param points
+   *            a list<Vector> defining the points to be clustered
+   * @param measure
+   *            a DistanceMeasure to use
+   * @param t1
+   *            the T1 distance threshold
+   * @param t2
+   *            the T2 distance threshold
+   * @return the List<Canopy> created
+   */
+  public static List<Canopy> createCanopies(List<Vector> points,
+                                            DistanceMeasure measure,
+                                            double t1,
+                                            double t2) {
+    List<Canopy> canopies = Lists.newArrayList();
+    /**
+     * Reference Implementation: Given a distance metric, one can create
+     * canopies as follows: Start with a list of the data points in any
+     * order, and with two distance thresholds, T1 and T2, where T1 > T2.
+     * (These thresholds can be set by the user, or selected by
+     * cross-validation.) Pick a point on the list and measure its distance
+     * to all other points. Put all points that are within distance
+     * threshold T1 into a canopy. Remove from the list all points that are
+     * within distance threshold T2. Repeat until the list is empty.
+     */
+    int nextCanopyId = 0;
+    while (!points.isEmpty()) {
+      Iterator<Vector> ptIter = points.iterator();
+      Vector p1 = ptIter.next();
+      ptIter.remove();
+      Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
+      canopies.add(canopy);
+      while (ptIter.hasNext()) {
+        Vector p2 = ptIter.next();
+        double dist = measure.distance(p1, p2);
+        // Put all points that are within distance threshold T1 into the
+        // canopy
+        if (dist < t1) {
+          canopy.observe(p2);
+        }
+        // Remove from the list all points that are within distance
+        // threshold T2
+        if (dist < t2) {
+          ptIter.remove();
+        }
+      }
+      for (Canopy c : canopies) {
+        c.computeParameters();
+      }
+    }
+    return canopies;
+  }
+
+  /**
+   * Iterate through the canopies, adding their centroids to a list
+   * 
+   * @param canopies
+   *            a List<Canopy>
+   * @return the List<Vector>
+   */
+  public static List<Vector> getCenters(Iterable<Canopy> canopies) {
+    List<Vector> result = Lists.newArrayList();
+    for (Canopy canopy : canopies) {
+      result.add(canopy.getCenter());
+    }
+    return result;
+  }
+
+  /**
+   * Iterate through the canopies, resetting their center to their centroids
+   * 
+   * @param canopies
+   *            a List<Canopy>
+   */
+  public static void updateCentroids(Iterable<Canopy> canopies) {
+    for (Canopy canopy : canopies) {
+      canopy.computeParameters();
+    }
+  }
+
+  public void setT3(double t3) {
+    this.t3 = t3;
+  }
+
+  public void setT4(double t4) {
+    this.t4 = t4;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
new file mode 100644
index 0000000..2f24026
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+
+@Deprecated
+public final class CanopyConfigKeys {
+
+  private CanopyConfigKeys() {}
+
+  public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
+  public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+  public static final String T3_KEY = "org.apache.mahout.clustering.canopy.t3";
+
+  public static final String T4_KEY = "org.apache.mahout.clustering.canopy.t4";
+
+  // keys used by Driver, Mapper, Combiner & Reducer
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+
+  public static final String CF_KEY = "org.apache.mahout.clustering.canopy.canopyFilter";
+
+  /**
+   * Create a {@link CanopyClusterer} from the Hadoop configuration.
+   *
+   * @param configuration Hadoop configuration
+   *
+   * @return CanopyClusterer
+   */
+  public static CanopyClusterer configureCanopyClusterer(Configuration configuration) {
+    double t1 = Double.parseDouble(configuration.get(T1_KEY));
+    double t2 = Double.parseDouble(configuration.get(T2_KEY));
+
+    DistanceMeasure measure = ClassUtils.instantiateAs(configuration.get(DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+    measure.configure(configuration);
+
+    CanopyClusterer canopyClusterer = new CanopyClusterer(measure, t1, t2);
+
+    String d = configuration.get(T3_KEY);
+    if (d != null) {
+      canopyClusterer.setT3(Double.parseDouble(d));
+    }
+
+    d = configuration.get(T4_KEY);
+    if (d != null) {
+      canopyClusterer.setT4(Double.parseDouble(d));
+    }
+    return canopyClusterer;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
new file mode 100644
index 0000000..06dc947
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+@Deprecated
+public class CanopyDriver extends AbstractJob {
+
+  public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
+
+  private static final Logger log = LoggerFactory.getLogger(CanopyDriver.class);
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new CanopyDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.t1Option().create());
+    addOption(DefaultOptionCreator.t2Option().create());
+    addOption(DefaultOptionCreator.t3Option().create());
+    addOption(DefaultOptionCreator.t4Option().create());
+    addOption(DefaultOptionCreator.clusterFilterOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.outlierThresholdOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    Configuration conf = getConf();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(conf, output);
+    }
+    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+    double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+    double t3 = t1;
+    if (hasOption(DefaultOptionCreator.T3_OPTION)) {
+      t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION));
+    }
+    double t4 = t2;
+    if (hasOption(DefaultOptionCreator.T4_OPTION)) {
+      t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
+    }
+    int clusterFilter = 0;
+    if (hasOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION)) {
+      clusterFilter = Integer
+          .parseInt(getOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION));
+    }
+    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
+        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+    double clusterClassificationThreshold = 0.0;
+    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
+      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+    }
+    run(conf, input, output, measure, t1, t2, t3, t4, clusterFilter,
+        runClustering, clusterClassificationThreshold, runSequential);
+    return 0;
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input arguments and, if
+   * requested, cluster the input vectors using these clusters
+   * 
+   * @param conf
+   *          the Configuration
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the minimum canopy size output by the mappers
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param clusterClassificationThreshold 
+   *          vectors having pdf below this value will not be clustered. Its value should be between 0 and 1.
+   * @param runSequential
+   *          execute sequentially if true
+   */
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
+        t4, clusterFilter, runSequential);
+    if (runClustering) {
+      clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential);
+    }
+  }
+
+  /**
+   * Convenience method to provide backward compatibility
+   */
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, boolean runClustering,
+      double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    run(conf, input, output, measure, t1, t2, t1, t2, 0, runClustering,
+        clusterClassificationThreshold, runSequential);
+  }
+
+  /**
+   * Convenience method creates new Configuration() Build a directory of Canopy
+   * clusters from the input arguments and, if requested, cluster the input
+   * vectors using these clusters
+   * 
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param clusterClassificationThreshold
+   *          vectors having pdf below this value will not be clustered. Its value should be between 0 and 1. 
+   * @param runSequential
+   *          execute sequentially if true
+   */
+  public static void run(Path input, Path output, DistanceMeasure measure,
+      double t1, double t2, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    run(new Configuration(), input, output, measure, t1, t2, runClustering,
+        clusterClassificationThreshold, runSequential);
+  }
+
+  /**
+   * Convenience method for backwards compatibility
+   * 
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter,
+      boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    return buildClusters(conf, input, output, measure, t1, t2, t1, t2,
+        clusterFilter, runSequential);
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential or mapreduce execution as requested
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @param runSequential
+   *          a boolean indicates to run the sequential (reference) algorithm
+   * @return the canopy output directory Path
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
+             input, output, measure, t1, t2);
+    if (runSequential) {
+      return buildClustersSeq(input, output, measure, t1, t2, clusterFilter);
+    } else {
+      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4,
+          clusterFilter);
+    }
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential execution
+   * 
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @return the canopy output directory Path
+   */
+  private static Path buildClustersSeq(Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter)
+    throws IOException {
+    CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
+    Collection<Canopy> canopies = Lists.newArrayList();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+
+    for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(
+        input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+      clusterer.addPointToCanopies(vw.get(), canopies);
+    }
+
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX);
+    Path path = new Path(canopyOutputDir, "part-r-00000");
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, ClusterWritable.class);
+    try {
+      ClusterWritable clusterWritable = new ClusterWritable();
+      for (Canopy canopy : canopies) {
+        canopy.computeParameters();
+        if (log.isDebugEnabled()) {
+          log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
+                    canopy.getIdentifier(),
+                    AbstractCluster.formatVector(canopy.getCenter(), null),
+                    canopy.getNumObservations(),
+                    AbstractCluster.formatVector(canopy.getRadius(), null));
+        }
+        if (canopy.getNumObservations() > clusterFilter) {
+          clusterWritable.setValue(canopy);
+          writer.append(new Text(canopy.getIdentifier()), clusterWritable);
+        }
+      }
+    } finally {
+      Closeables.close(writer, false);
+    }
+    return canopyOutputDir;
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run mapreduce execution
+   * 
+   * @param conf
+   *          the Configuration
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @return the canopy output directory Path
+   */
+  private static Path buildClustersMR(Configuration conf, Path input,
+      Path output, DistanceMeasure measure, double t1, double t2, double t3,
+      double t4, int clusterFilter) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
+    conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
+    conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
+    conf.set(CanopyConfigKeys.CF_KEY, String.valueOf(clusterFilter));
+
+    Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
+        + input);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(CanopyMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setReducerClass(CanopyReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(ClusterWritable.class);
+    job.setNumReduceTasks(1);
+    job.setJarByClass(CanopyDriver.class);
+
+    FileInputFormat.addInputPath(job, input);
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX);
+    FileOutputFormat.setOutputPath(job, canopyOutputDir);
+    if (!job.waitForCompletion(true)) {
+      throw new InterruptedException("Canopy Job failed processing " + input);
+    }
+    return canopyOutputDir;
+  }
+
+  private static void clusterData(Configuration conf,
+                                  Path points,
+                                  Path canopies,
+                                  Path output,
+                                  double clusterClassificationThreshold,
+                                  boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies);
+    ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY),
+                                    clusterClassificationThreshold, true, runSequential);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
new file mode 100644
index 0000000..265d3da
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VectorWritable;
+
+@Deprecated
+class CanopyMapper extends
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
+
+  private final Collection<Canopy> canopies = Lists.newArrayList();
+
+  private CanopyClusterer canopyClusterer;
+
+  private int clusterFilter;
+
+  @Override
+  protected void map(WritableComparable<?> key, VectorWritable point,
+      Context context) throws IOException, InterruptedException {
+    canopyClusterer.addPointToCanopies(point.get(), canopies);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    super.setup(context);
+    canopyClusterer = CanopyConfigKeys.configureCanopyClusterer(context.getConfiguration());
+    clusterFilter = Integer.parseInt(context.getConfiguration().get(
+        CanopyConfigKeys.CF_KEY));
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+    for (Canopy canopy : canopies) {
+      canopy.computeParameters();
+      if (canopy.getNumObservations() > clusterFilter) {
+        context.write(new Text("centroid"), new VectorWritable(canopy
+            .getCenter()));
+      }
+    }
+    super.cleanup(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
new file mode 100644
index 0000000..cdd7d5e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+@Deprecated
+public class CanopyReducer extends Reducer<Text, VectorWritable, Text, ClusterWritable> {
+
+  private final Collection<Canopy> canopies = Lists.newArrayList();
+
+  private CanopyClusterer canopyClusterer;
+
+  private int clusterFilter;
+
+  CanopyClusterer getCanopyClusterer() {
+    return canopyClusterer;
+  }
+
+  @Override
+  protected void reduce(Text arg0, Iterable<VectorWritable> values,
+      Context context) throws IOException, InterruptedException {
+    for (VectorWritable value : values) {
+      Vector point = value.get();
+      canopyClusterer.addPointToCanopies(point, canopies);
+    }
+    for (Canopy canopy : canopies) {
+      canopy.computeParameters();
+      if (canopy.getNumObservations() > clusterFilter) {
+        ClusterWritable clusterWritable = new ClusterWritable();
+        clusterWritable.setValue(canopy);
+        context.write(new Text(canopy.getIdentifier()), clusterWritable);
+      }
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
+    super.setup(context);
+    canopyClusterer = CanopyConfigKeys.configureCanopyClusterer(context.getConfiguration());
+    canopyClusterer.useT3T4();
+    clusterFilter = Integer.parseInt(context.getConfiguration().get(
+        CanopyConfigKeys.CF_KEY));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java
new file mode 100644
index 0000000..6b88388
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+/**
+ * Constants used in Cluster Classification.
+ */
+public final class ClusterClassificationConfigKeys {
+  
+  public static final String CLUSTERS_IN = "clusters_in";
+  
+  public static final String OUTLIER_REMOVAL_THRESHOLD = "pdf_threshold";
+  
+  public static final String EMIT_MOST_LIKELY = "emit_most_likely";
+
+  private ClusterClassificationConfigKeys() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
new file mode 100644
index 0000000..6e2c3cf
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
+import org.apache.mahout.clustering.iterator.DistanceMeasureCluster;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Classifies the vectors into different clusters found by the clustering
+ * algorithm.
+ */
+public final class ClusterClassificationDriver extends AbstractJob {
+  
+  /**
+   * CLI to run Cluster Classification Driver.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.clustersInOption()
+        .withDescription("The input centroids, as Vectors.  Must be a SequenceFile of Writable, Cluster/Canopy.")
+        .create());
+    
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    
+    if (getConf() == null) {
+      setConf(new Configuration());
+    }
+    Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
+    
+    double clusterClassificationThreshold = 0.0;
+    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
+      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+    }
+    
+    run(getConf(), input, clustersIn, output, clusterClassificationThreshold, true, runSequential);
+    
+    return 0;
+  }
+  
+  /**
+   * Constructor to be used by the ToolRunner.
+   */
+  private ClusterClassificationDriver() {
+  }
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new ClusterClassificationDriver(), args);
+  }
+  
+  /**
+   * Uses {@link ClusterClassifier} to classify input vectors into their
+   * respective clusters.
+   * 
+   * @param input
+   *          the input vectors
+   * @param clusteringOutputPath
+   *          the output path of clustering ( it reads clusters-*-final file
+   *          from here )
+   * @param output
+   *          the location to store the classified vectors
+   * @param clusterClassificationThreshold
+   *          the threshold value of probability distribution function from 0.0
+   *          to 1.0. Any vector with pdf less that this threshold will not be
+   *          classified for the cluster.
+   * @param runSequential
+   *          Run the process sequentially or in a mapreduce way.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output, Double clusterClassificationThreshold,
+      boolean emitMostLikely, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+    if (runSequential) {
+      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    } else {
+      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    }
+    
+  }
+  
+  private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output,
+      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
+    List<Cluster> clusterModels = populateClusterModels(clusters, conf);
+    ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(conf, clusters));
+    ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy);
+    selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely);
+    
+  }
+  
+  /**
+   * Populates a list with clusters present in clusters-*-final directory.
+   * 
+   * @param clusterOutputPath
+   *          The output path of the clustering.
+   * @param conf
+   *          The Hadoop Configuration
+   * @return The list of clusters found by the clustering.
+   * @throws IOException
+   */
+  private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
+    List<Cluster> clusterModels = Lists.newArrayList();
+    Path finalClustersPath = finalClustersPath(conf, clusterOutputPath);
+    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
+        PathFilters.partFilter(), null, false, conf);
+    while (it.hasNext()) {
+      ClusterWritable next = (ClusterWritable) it.next();
+      Cluster cluster = next.getValue();
+      cluster.configure(conf);
+      clusterModels.add(cluster);
+    }
+    return clusterModels;
+  }
+  
+  private static Path finalClustersPath(Configuration conf, Path clusterOutputPath) throws IOException {
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    return clusterFiles[0].getPath();
+  }
+  
+  /**
+   * Classifies the vector into its respective cluster.
+   * 
+   * @param input
+   *          the path containing the input vector.
+   * @param clusterModels
+   *          the clusters
+   * @param clusterClassifier
+   *          used to classify the vectors into different clusters
+   * @param output
+   *          the path to store classified data
+   * @param clusterClassificationThreshold
+   *          the threshold value of probability distribution function from 0.0
+   *          to 1.0. Any vector with pdf less that this threshold will not be
+   *          classified for the cluster
+   * @param emitMostLikely
+   *          emit the vectors with the max pdf values per cluster
+   * @throws IOException
+   */
+  private static void selectCluster(Path input, List<Cluster> clusterModels, ClusterClassifier clusterClassifier,
+      Path output, Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException {
+    Configuration conf = new Configuration();
+    SequenceFile.Writer writer = new SequenceFile.Writer(input.getFileSystem(conf), conf, new Path(output,
+        "part-m-" + 0), IntWritable.class, WeightedPropertyVectorWritable.class);
+    for (Pair<Writable, VectorWritable> vw : new SequenceFileDirIterable<Writable, VectorWritable>(input, PathType.LIST,
+        PathFilters.logsCRCFilter(), conf)) {
+      // Converting to NamedVectors to preserve the vectorId else its not obvious as to which point
+      // belongs to which cluster - fix for MAHOUT-1410
+      Class<? extends Writable> keyClass = vw.getFirst().getClass();
+      Vector vector = vw.getSecond().get();
+      if (!keyClass.equals(NamedVector.class)) {
+        if (keyClass.equals(Text.class)) {
+          vector = new NamedVector(vector, vw.getFirst().toString());
+        } else if (keyClass.equals(IntWritable.class)) {
+          vector = new NamedVector(vector, Integer.toString(((IntWritable) vw.getFirst()).get()));
+        }
+      }
+      Vector pdfPerCluster = clusterClassifier.classify(vector);
+      if (shouldClassify(pdfPerCluster, clusterClassificationThreshold)) {
+        classifyAndWrite(clusterModels, clusterClassificationThreshold, emitMostLikely, writer, new VectorWritable(vector), pdfPerCluster);
+      }
+    }
+    writer.close();
+  }
+  
+  private static void classifyAndWrite(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+      boolean emitMostLikely, SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
+    Map<Text, Text> props = Maps.newHashMap();
+    if (emitMostLikely) {
+      int maxValueIndex = pdfPerCluster.maxValueIndex();
+      WeightedPropertyVectorWritable weightedPropertyVectorWritable =
+          new WeightedPropertyVectorWritable(pdfPerCluster.maxValue(), vw.get(), props);
+      write(clusterModels, writer, weightedPropertyVectorWritable, maxValueIndex);
+    } else {
+      writeAllAboveThreshold(clusterModels, clusterClassificationThreshold, writer, vw, pdfPerCluster);
+    }
+  }
+  
+  private static void writeAllAboveThreshold(List<Cluster> clusterModels, Double clusterClassificationThreshold,
+      SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException {
+    Map<Text, Text> props = Maps.newHashMap();
+    for (Element pdf : pdfPerCluster.nonZeroes()) {
+      if (pdf.get() >= clusterClassificationThreshold) {
+        WeightedPropertyVectorWritable wvw = new WeightedPropertyVectorWritable(pdf.get(), vw.get(), props);
+        int clusterIndex = pdf.index();
+        write(clusterModels, writer, wvw, clusterIndex);
+      }
+    }
+  }
+
+  private static void write(List<Cluster> clusterModels, SequenceFile.Writer writer,
+      WeightedPropertyVectorWritable weightedPropertyVectorWritable,
+      int maxValueIndex) throws IOException {
+    Cluster cluster = clusterModels.get(maxValueIndex);
+
+    DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) cluster;
+    DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
+    double distance = distanceMeasure.distance(cluster.getCenter(), weightedPropertyVectorWritable.getVector());
+
+    weightedPropertyVectorWritable.getProperties().put(new Text("distance"), new Text(Double.toString(distance)));
+    writer.append(new IntWritable(cluster.getId()), weightedPropertyVectorWritable);
+  }
+  
+  /**
+   * Decides whether the vector should be classified or not based on the max pdf
+   * value of the clusters and threshold value.
+   * 
+   * @return whether the vector should be classified or not.
+   */
+  private static boolean shouldClassify(Vector pdfPerCluster, Double clusterClassificationThreshold) {
+    return pdfPerCluster.maxValue() >= clusterClassificationThreshold;
+  }
+  
+  private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
+      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    
+    conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD,
+                  clusterClassificationThreshold.floatValue());
+    conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely);
+    conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
+    
+    Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
+    job.setJarByClass(ClusterClassificationDriver.class);
+    
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    
+    job.setMapperClass(ClusterClassificationMapper.class);
+    job.setNumReduceTasks(0);
+    
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(WeightedPropertyVectorWritable.class);
+    
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+    if (!job.waitForCompletion(true)) {
+      throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
+    }
+  }
+  
+  public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
+      double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    if (runSequential) {
+      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    } else {
+      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
new file mode 100644
index 0000000..9edbd8e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationMapper.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
+import org.apache.mahout.clustering.iterator.DistanceMeasureCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Mapper for classifying vectors into clusters.
+ */
+public class ClusterClassificationMapper extends
+    Mapper<WritableComparable<?>,VectorWritable,IntWritable,WeightedVectorWritable> {
+  
+  private double threshold;
+  private List<Cluster> clusterModels;
+  private ClusterClassifier clusterClassifier;
+  private IntWritable clusterId;
+  private boolean emitMostLikely;
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    
+    Configuration conf = context.getConfiguration();
+    String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
+    threshold = conf.getFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, 0.0f);
+    emitMostLikely = conf.getBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, false);
+    
+    clusterModels = Lists.newArrayList();
+    
+    if (clustersIn != null && !clustersIn.isEmpty()) {
+      Path clustersInPath = new Path(clustersIn);
+      clusterModels = populateClusterModels(clustersInPath, conf);
+      ClusteringPolicy policy = ClusterClassifier
+          .readPolicy(finalClustersPath(clustersInPath));
+      clusterClassifier = new ClusterClassifier(clusterModels, policy);
+    }
+    clusterId = new IntWritable();
+  }
+  
+  /**
+   * Mapper which classifies the vectors to respective clusters.
+   */
+  @Override
+  protected void map(WritableComparable<?> key, VectorWritable vw, Context context)
+    throws IOException, InterruptedException {
+    if (!clusterModels.isEmpty()) {
+      // Converting to NamedVectors to preserve the vectorId else its not obvious as to which point
+      // belongs to which cluster - fix for MAHOUT-1410
+      Class<? extends Vector> vectorClass = vw.get().getClass();
+      Vector vector = vw.get();
+      if (!vectorClass.equals(NamedVector.class)) {
+        if (key.getClass().equals(Text.class)) {
+          vector = new NamedVector(vector, key.toString());
+        } else if (key.getClass().equals(IntWritable.class)) {
+          vector = new NamedVector(vector, Integer.toString(((IntWritable) key).get()));
+        }
+      }
+      Vector pdfPerCluster = clusterClassifier.classify(vector);
+      if (shouldClassify(pdfPerCluster)) {
+        if (emitMostLikely) {
+          int maxValueIndex = pdfPerCluster.maxValueIndex();
+          write(new VectorWritable(vector), context, maxValueIndex, 1.0);
+        } else {
+          writeAllAboveThreshold(new VectorWritable(vector), context, pdfPerCluster);
+        }
+      }
+    }
+  }
+  
+  private void writeAllAboveThreshold(VectorWritable vw, Context context,
+      Vector pdfPerCluster) throws IOException, InterruptedException {
+    for (Element pdf : pdfPerCluster.nonZeroes()) {
+      if (pdf.get() >= threshold) {
+        int clusterIndex = pdf.index();
+        write(vw, context, clusterIndex, pdf.get());
+      }
+    }
+  }
+  
+  private void write(VectorWritable vw, Context context, int clusterIndex, double weight)
+    throws IOException, InterruptedException {
+    Cluster cluster = clusterModels.get(clusterIndex);
+    clusterId.set(cluster.getId());
+
+    DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) cluster;
+    DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
+    double distance = distanceMeasure.distance(cluster.getCenter(), vw.get());
+
+    Map<Text, Text> props = Maps.newHashMap();
+    props.put(new Text("distance"), new Text(Double.toString(distance)));
+    context.write(clusterId, new WeightedPropertyVectorWritable(weight, vw.get(), props));
+  }
+  
+  public static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException {
+    List<Cluster> clusters = Lists.newArrayList();
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(
+        clusterFiles[0].getPath(), PathType.LIST, PathFilters.partFilter(),
+        null, false, conf);
+    while (it.hasNext()) {
+      ClusterWritable next = (ClusterWritable) it.next();
+      Cluster cluster = next.getValue();
+      cluster.configure(conf);
+      clusters.add(cluster);
+    }
+    return clusters;
+  }
+  
+  private boolean shouldClassify(Vector pdfPerCluster) {
+    return pdfPerCluster.maxValue() >= threshold;
+  }
+  
+  private static Path finalClustersPath(Path clusterOutputPath) throws IOException {
+    FileSystem fileSystem = clusterOutputPath.getFileSystem(new Configuration());
+    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter());
+    return clusterFiles[0].getPath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
new file mode 100644
index 0000000..d5f8d64
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
@@ -0,0 +1,240 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.classify;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.AbstractVectorClassifier;
+import org.apache.mahout.classifier.OnlineLearner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
+import org.apache.mahout.clustering.iterator.ClusteringPolicyWritable;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+/**
+ * This classifier works with any ClusteringPolicy and its associated Clusters.
+ * It is initialized with a policy and a list of compatible clusters and
+ * thereafter it can classify any new Vector into one or more of the clusters
+ * based upon the pdf() function which each cluster supports.
+ * 
+ * In addition, it is an OnlineLearner and can be trained. Training amounts to
+ * asking the actual model to observe the vector and closing the classifier
+ * causes all the models to computeParameters.
+ * 
+ * Because a ClusterClassifier implements Writable, it can be written-to and
+ * read-from a sequence file as a single entity. For sequential and MapReduce
+ * clustering in conjunction with a ClusterIterator; however, it utilizes an
+ * exploded file format. In this format, the iterator writes the policy to a
+ * single POLICY_FILE_NAME file in the clustersOut directory and the models are
+ * written to one or more part-n files so that multiple reducers may employed to
+ * produce them.
+ */
+public class ClusterClassifier extends AbstractVectorClassifier implements OnlineLearner, Writable {
+  
+  private static final String POLICY_FILE_NAME = "_policy";
+  
+  private List<Cluster> models;
+  
+  private String modelClass;
+  
+  private ClusteringPolicy policy;
+  
+  /**
+   * The public constructor accepts a list of clusters to become the models
+   * 
+   * @param models
+   *          a List<Cluster>
+   * @param policy
+   *          a ClusteringPolicy
+   */
+  public ClusterClassifier(List<Cluster> models, ClusteringPolicy policy) {
+    this.models = models;
+    modelClass = models.get(0).getClass().getName();
+    this.policy = policy;
+  }
+  
+  // needed for serialization/De-serialization
+  public ClusterClassifier() {}
+  
+  // only used by MR ClusterIterator
+  protected ClusterClassifier(ClusteringPolicy policy) {
+    this.policy = policy;
+  }
+  
+  @Override
+  public Vector classify(Vector instance) {
+    return policy.classify(instance, this);
+  }
+  
+  @Override
+  public double classifyScalar(Vector instance) {
+    if (models.size() == 2) {
+      double pdf0 = models.get(0).pdf(new VectorWritable(instance));
+      double pdf1 = models.get(1).pdf(new VectorWritable(instance));
+      return pdf0 / (pdf0 + pdf1);
+    }
+    throw new IllegalStateException();
+  }
+  
+  @Override
+  public int numCategories() {
+    return models.size();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(models.size());
+    out.writeUTF(modelClass);
+    new ClusteringPolicyWritable(policy).write(out);
+    for (Cluster cluster : models) {
+      cluster.write(out);
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    modelClass = in.readUTF();
+    models = Lists.newArrayList();
+    ClusteringPolicyWritable clusteringPolicyWritable = new ClusteringPolicyWritable();
+    clusteringPolicyWritable.readFields(in);
+    policy = clusteringPolicyWritable.getValue();
+    for (int i = 0; i < size; i++) {
+      Cluster element = ClassUtils.instantiateAs(modelClass, Cluster.class);
+      element.readFields(in);
+      models.add(element);
+    }
+  }
+  
+  @Override
+  public void train(int actual, Vector instance) {
+    models.get(actual).observe(new VectorWritable(instance));
+  }
+  
+  /**
+   * Train the models given an additional weight. Unique to ClusterClassifier
+   * 
+   * @param actual
+   *          the int index of a model
+   * @param data
+   *          a data Vector
+   * @param weight
+   *          a double weighting factor
+   */
+  public void train(int actual, Vector data, double weight) {
+    models.get(actual).observe(new VectorWritable(data), weight);
+  }
+  
+  @Override
+  public void train(long trackingKey, String groupKey, int actual, Vector instance) {
+    models.get(actual).observe(new VectorWritable(instance));
+  }
+  
+  @Override
+  public void train(long trackingKey, int actual, Vector instance) {
+    models.get(actual).observe(new VectorWritable(instance));
+  }
+  
+  @Override
+  public void close() {
+    policy.close(this);
+  }
+  
+  public List<Cluster> getModels() {
+    return models;
+  }
+  
+  public ClusteringPolicy getPolicy() {
+    return policy;
+  }
+  
+  public void writeToSeqFiles(Path path) throws IOException {
+    writePolicy(policy, path);
+    Configuration config = new Configuration();
+    FileSystem fs = FileSystem.get(path.toUri(), config);
+    SequenceFile.Writer writer = null;
+    ClusterWritable cw = new ClusterWritable();
+    for (int i = 0; i < models.size(); i++) {
+      try {
+        Cluster cluster = models.get(i);
+        cw.setValue(cluster);
+        writer = new SequenceFile.Writer(fs, config,
+            new Path(path, "part-" + String.format(Locale.ENGLISH, "%05d", i)), IntWritable.class,
+            ClusterWritable.class);
+        Writable key = new IntWritable(i);
+        writer.append(key, cw);
+      } finally {
+        Closeables.close(writer, false);
+      }
+    }
+  }
+  
+  public void readFromSeqFiles(Configuration conf, Path path) throws IOException {
+    Configuration config = new Configuration();
+    List<Cluster> clusters = Lists.newArrayList();
+    for (ClusterWritable cw : new SequenceFileDirValueIterable<ClusterWritable>(path, PathType.LIST,
+        PathFilters.logsCRCFilter(), config)) {
+      Cluster cluster = cw.getValue();
+      cluster.configure(conf);
+      clusters.add(cluster);
+    }
+    this.models = clusters;
+    modelClass = models.get(0).getClass().getName();
+    this.policy = readPolicy(path);
+  }
+  
+  public static ClusteringPolicy readPolicy(Path path) throws IOException {
+    Path policyPath = new Path(path, POLICY_FILE_NAME);
+    Configuration config = new Configuration();
+    FileSystem fs = FileSystem.get(policyPath.toUri(), config);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, policyPath, config);
+    Text key = new Text();
+    ClusteringPolicyWritable cpw = new ClusteringPolicyWritable();
+    reader.next(key, cpw);
+    Closeables.close(reader, true);
+    return cpw.getValue();
+  }
+  
+  public static void writePolicy(ClusteringPolicy policy, Path path) throws IOException {
+    Path policyPath = new Path(path, POLICY_FILE_NAME);
+    Configuration config = new Configuration();
+    FileSystem fs = FileSystem.get(policyPath.toUri(), config);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, config, policyPath, Text.class,
+        ClusteringPolicyWritable.class);
+    writer.append(new Text(), new ClusteringPolicyWritable(policy));
+    Closeables.close(writer, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedPropertyVectorWritable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedPropertyVectorWritable.java b/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedPropertyVectorWritable.java
new file mode 100644
index 0000000..567659b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedPropertyVectorWritable.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.math.Vector;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class WeightedPropertyVectorWritable extends WeightedVectorWritable {
+
+  private Map<Text, Text> properties;
+
+  public WeightedPropertyVectorWritable() {
+  }
+
+  public WeightedPropertyVectorWritable(Map<Text, Text> properties) {
+    this.properties = properties;
+  }
+
+  public WeightedPropertyVectorWritable(double weight, Vector vector, Map<Text, Text> properties) {
+    super(weight, vector);
+    this.properties = properties;
+  }
+
+  public Map<Text, Text> getProperties() {
+    return properties;
+  }
+
+  public void setProperties(Map<Text, Text> properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int size = in.readInt();
+    if (size > 0) {
+      properties = new HashMap<>();
+      for (int i = 0; i < size; i++) {
+        Text key = new Text(in.readUTF());
+        Text val = new Text(in.readUTF());
+        properties.put(key, val);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(properties != null ? properties.size() : 0);
+    if (properties != null) {
+      for (Map.Entry<Text, Text> entry : properties.entrySet()) {
+        out.writeUTF(entry.getKey().toString());
+        out.writeUTF(entry.getValue().toString());
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    Vector vector = getVector();
+    StringBuilder bldr = new StringBuilder("wt: ").append(getWeight()).append(' ');
+    if (properties != null && !properties.isEmpty()) {
+      for (Map.Entry<Text, Text> entry : properties.entrySet()) {
+        bldr.append(entry.getKey().toString()).append(": ").append(entry.getValue().toString()).append(' ');
+      }
+    }
+    bldr.append(" vec: ").append(vector == null ? "null" : AbstractCluster.formatVector(vector, null));
+    return bldr.toString();
+  }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedVectorWritable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedVectorWritable.java b/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedVectorWritable.java
new file mode 100644
index 0000000..510dd39
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/classify/WeightedVectorWritable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.classify;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class WeightedVectorWritable implements Writable {
+
+  private final VectorWritable vectorWritable = new VectorWritable();
+  private double weight;
+
+  public WeightedVectorWritable() {
+  }
+
+  public WeightedVectorWritable(double weight, Vector vector) {
+    this.vectorWritable.set(vector);
+    this.weight = weight;
+  }
+
+  public Vector getVector() {
+    return vectorWritable.get();
+  }
+
+  public void setVector(Vector vector) {
+    vectorWritable.set(vector);
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    vectorWritable.readFields(in);
+    weight = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    vectorWritable.write(out);
+    out.writeDouble(weight);
+  }
+
+  @Override
+  public String toString() {
+    Vector vector = vectorWritable.get();
+    return weight + ": " + (vector == null ? "null" : AbstractCluster.formatVector(vector, null));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
new file mode 100644
index 0000000..ff02a4c
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+
+public class FuzzyKMeansClusterer {
+
+  private static final double MINIMAL_VALUE = 0.0000000001;
+  
+  private double m = 2.0; // default value
+  
+  public Vector computePi(Collection<SoftCluster> clusters, List<Double> clusterDistanceList) {
+    Vector pi = new DenseVector(clusters.size());
+    for (int i = 0; i < clusters.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+      pi.set(i, probWeight);
+    }
+    return pi;
+  }
+  
+  /** Computes the probability of a point belonging to a cluster */
+  public double computeProbWeight(double clusterDistance, Iterable<Double> clusterDistanceList) {
+    if (clusterDistance == 0) {
+      clusterDistance = MINIMAL_VALUE;
+    }
+    double denom = 0.0;
+    for (double eachCDist : clusterDistanceList) {
+      if (eachCDist == 0.0) {
+        eachCDist = MINIMAL_VALUE;
+      }
+      denom += Math.pow(clusterDistance / eachCDist, 2.0 / (m - 1));
+    }
+    return 1.0 / denom;
+  }
+
+  public void setM(double m) {
+    this.m = m;
+  }
+}