You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/28 14:54:37 UTC

[09/51] [partial] mahout git commit: NO-JIRA Clean up MR refactor

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/RecordFactory.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/RecordFactory.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/RecordFactory.java
new file mode 100644
index 0000000..fbc825d
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/RecordFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.sgd;
+
+import org.apache.mahout.math.Vector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A record factor understands how to convert a line of data into fields and then into a vector.
+ */
+public interface RecordFactory {
+  void defineTargetCategories(List<String> values);
+
+  RecordFactory maxTargetValue(int max);
+
+  boolean usesFirstLineAsSchema();
+
+  int processLine(String line, Vector featureVector);
+
+  Iterable<String> getPredictors();
+
+  Map<String, Set<Integer>> getTraceDictionary();
+
+  RecordFactory includeBiasTerm(boolean useBias);
+
+  List<String> getTargetCategories();
+
+  void firstLine(String line);
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/TPrior.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/TPrior.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/TPrior.java
new file mode 100644
index 0000000..0a7b6a7
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/TPrior.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.sgd;
+
+import org.apache.commons.math3.special.Gamma;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Provides a t-distribution as a prior.
+ */
+public class TPrior implements PriorFunction {
+  private double df;
+
+  public TPrior(double df) {
+    this.df = df;
+  }
+
+  @Override
+  public double age(double oldValue, double generations, double learningRate) {
+    for (int i = 0; i < generations; i++) {
+      oldValue -= learningRate * oldValue * (df + 1.0) / (df + oldValue * oldValue);
+    }
+    return oldValue;
+  }
+
+  @Override
+  public double logP(double betaIJ) {
+    return Gamma.logGamma((df + 1.0) / 2.0)
+        - Math.log(df * Math.PI)
+        - Gamma.logGamma(df / 2.0)
+        - (df + 1.0) / 2.0 * Math.log1p(betaIJ * betaIJ);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(df);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    df = in.readDouble();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/UniformPrior.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/UniformPrior.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/UniformPrior.java
new file mode 100644
index 0000000..23c812f
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/UniformPrior.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.sgd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A uniform prior.  This is an improper prior that corresponds to no regularization at all.
+ */
+public class UniformPrior implements PriorFunction {
+  @Override
+  public double age(double oldValue, double generations, double learningRate) {
+    return oldValue;
+  }
+
+  @Override
+  public double logP(double betaIJ) {
+    return 0;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    // nothing to write
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    // stateless class is trivial to read
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/package-info.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/package-info.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/package-info.java
new file mode 100644
index 0000000..c2ad966
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/classifier/sgd/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * <p>Implements a variety of on-line logistric regression classifiers using SGD-based algorithms.
+ * SGD stands for Stochastic Gradient Descent and refers to a class of learning algorithms
+ * that make it relatively easy to build high speed on-line learning algorithms for a variety
+ * of problems, notably including supervised learning for classification.</p>
+ *
+ * <p>The primary class of interest in the this package is
+ * {@link org.apache.mahout.classifier.sgd.CrossFoldLearner} which contains a
+ * number (typically 5) of sub-learners, each of which is given a different portion of the
+ * training data.  Each of these sub-learners can then be evaluated on the data it was not
+ * trained on.  This allows fully incremental learning while still getting cross-validated
+ * performance estimates.</p>
+ *
+ * <p>The CrossFoldLearner implements {@link org.apache.mahout.classifier.OnlineLearner}
+ * and thus expects to be fed input in the form
+ * of a target variable and a feature vector.  The target variable is simply an integer in the
+ * half-open interval [0..numFeatures) where numFeatures is defined when the CrossFoldLearner
+ * is constructed.  The creation of feature vectors is facilitated by the classes that inherit
+ * from {@link org.apache.mahout.vectorizer.encoders.FeatureVectorEncoder}.
+ * These classes currently implement a form of feature hashing with
+ * multiple probes to limit feature ambiguity.</p>
+ */
+package org.apache.mahout.classifier.sgd;

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/AbstractCluster.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/AbstractCluster.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/AbstractCluster.java
new file mode 100644
index 0000000..be7ed2a
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/AbstractCluster.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.SquareRootFunction;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public abstract class AbstractCluster implements Cluster {
+  
+  // cluster persistent state
+  private int id;
+  
+  private long numObservations;
+  
+  private long totalObservations;
+  
+  private Vector center;
+  
+  private Vector radius;
+  
+  // the observation statistics
+  private double s0;
+  
+  private Vector s1;
+  
+  private Vector s2;
+
+  private static final ObjectMapper jxn = new ObjectMapper();
+  
+  protected AbstractCluster() {}
+  
+  protected AbstractCluster(Vector point, int id2) {
+    this.numObservations = (long) 0;
+    this.totalObservations = (long) 0;
+    this.center = point.clone();
+    this.radius = center.like();
+    this.s0 = (double) 0;
+    this.s1 = center.like();
+    this.s2 = center.like();
+    this.id = id2;
+  }
+  
+  protected AbstractCluster(Vector center2, Vector radius2, int id2) {
+    this.numObservations = (long) 0;
+    this.totalObservations = (long) 0;
+    this.center = new RandomAccessSparseVector(center2);
+    this.radius = new RandomAccessSparseVector(radius2);
+    this.s0 = (double) 0;
+    this.s1 = center.like();
+    this.s2 = center.like();
+    this.id = id2;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+    out.writeLong(getNumObservations());
+    out.writeLong(getTotalObservations());
+    VectorWritable.writeVector(out, getCenter());
+    VectorWritable.writeVector(out, getRadius());
+    out.writeDouble(s0);
+    VectorWritable.writeVector(out, s1);
+    VectorWritable.writeVector(out, s2);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+    this.setNumObservations(in.readLong());
+    this.setTotalObservations(in.readLong());
+    this.setCenter(VectorWritable.readVector(in));
+    this.setRadius(VectorWritable.readVector(in));
+    this.setS0(in.readDouble());
+    this.setS1(VectorWritable.readVector(in));
+    this.setS2(VectorWritable.readVector(in));
+  }
+  
+  @Override
+  public void configure(Configuration job) {
+    // nothing to do
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+    // nothing to do
+  }
+  
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  /**
+   * @param id
+   *          the id to set
+   */
+  protected void setId(int id) {
+    this.id = id;
+  }
+  
+  @Override
+  public long getNumObservations() {
+    return numObservations;
+  }
+
+  /**
+   * @param l
+   *          the numPoints to set
+   */
+  protected void setNumObservations(long l) {
+    this.numObservations = l;
+  }
+  
+  @Override
+  public long getTotalObservations() {
+    return totalObservations;
+  }
+
+  protected void setTotalObservations(long totalPoints) {
+    this.totalObservations = totalPoints;
+  }
+
+  @Override
+  public Vector getCenter() {
+    return center;
+  }
+
+  /**
+   * @param center
+   *          the center to set
+   */
+  protected void setCenter(Vector center) {
+    this.center = center;
+  }
+  
+  @Override
+  public Vector getRadius() {
+    return radius;
+  }
+
+  /**
+   * @param radius
+   *          the radius to set
+   */
+  protected void setRadius(Vector radius) {
+    this.radius = radius;
+  }
+  
+  /**
+   * @return the s0
+   */
+  protected double getS0() {
+    return s0;
+  }
+  
+  protected void setS0(double s0) {
+    this.s0 = s0;
+  }
+
+  /**
+   * @return the s1
+   */
+  protected Vector getS1() {
+    return s1;
+  }
+  
+  protected void setS1(Vector s1) {
+    this.s1 = s1;
+  }
+
+  /**
+   * @return the s2
+   */
+  protected Vector getS2() {
+    return s2;
+  }
+  
+  protected void setS2(Vector s2) {
+    this.s2 = s2;
+  }
+
+  @Override
+  public void observe(Model<VectorWritable> x) {
+    AbstractCluster cl = (AbstractCluster) x;
+    setS0(getS0() + cl.getS0());
+    setS1(getS1().plus(cl.getS1()));
+    setS2(getS2().plus(cl.getS2()));
+  }
+  
+  @Override
+  public void observe(VectorWritable x) {
+    observe(x.get());
+  }
+  
+  @Override
+  public void observe(VectorWritable x, double weight) {
+    observe(x.get(), weight);
+  }
+  
+  public void observe(Vector x, double weight) {
+    if (weight == 1.0) {
+      observe(x);
+    } else {
+      setS0(getS0() + weight);
+      Vector weightedX = x.times(weight);
+      if (getS1() == null) {
+        setS1(weightedX);
+      } else {
+        getS1().assign(weightedX, Functions.PLUS);
+      }
+      Vector x2 = x.times(x).times(weight);
+      if (getS2() == null) {
+        setS2(x2);
+      } else {
+        getS2().assign(x2, Functions.PLUS);
+      }
+    }
+  }
+  
+  public void observe(Vector x) {
+    setS0(getS0() + 1);
+    if (getS1() == null) {
+      setS1(x.clone());
+    } else {
+      getS1().assign(x, Functions.PLUS);
+    }
+    Vector x2 = x.times(x);
+    if (getS2() == null) {
+      setS2(x2);
+    } else {
+      getS2().assign(x2, Functions.PLUS);
+    }
+  }
+  
+  
+  @Override
+  public void computeParameters() {
+    if (getS0() == 0) {
+      return;
+    }
+    setNumObservations((long) getS0());
+    setTotalObservations(getTotalObservations() + getNumObservations());
+    setCenter(getS1().divide(getS0()));
+    // compute the component stds
+    if (getS0() > 1) {
+      setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
+    }
+    setS0(0);
+    setS1(center.like());
+    setS2(center.like());
+  }
+
+  @Override
+  public String asFormatString(String[] bindings) {
+    String fmtString = "";
+    try {
+      fmtString = jxn.writeValueAsString(asJson(bindings));
+    } catch (IOException e) {
+      log.error("Error writing JSON as String.", e);
+    }
+    return fmtString;
+  }
+
+  public Map<String,Object> asJson(String[] bindings) {
+    Map<String,Object> dict = new HashMap<>();
+    dict.put("identifier", getIdentifier());
+    dict.put("n", getNumObservations());
+    if (getCenter() != null) {
+      try {
+        dict.put("c", formatVectorAsJson(getCenter(), bindings));
+      } catch (IOException e) {
+        log.error("IOException:  ", e);
+      }
+    }
+    if (getRadius() != null) {
+      try {
+        dict.put("r", formatVectorAsJson(getRadius(), bindings));
+      } catch (IOException e) {
+        log.error("IOException:  ", e);
+      }
+    }
+    return dict;
+  }
+  
+  public abstract String getIdentifier();
+  
+  /**
+   * Compute the centroid by averaging the pointTotals
+   * 
+   * @return the new centroid
+   */
+  public Vector computeCentroid() {
+    return getS0() == 0 ? getCenter() : getS1().divide(getS0());
+  }
+
+  /**
+   * Return a human-readable formatted string representation of the vector, not
+   * intended to be complete nor usable as an input/output representation
+   */
+  public static String formatVector(Vector v, String[] bindings) {
+    String fmtString = "";
+    try {
+      fmtString = jxn.writeValueAsString(formatVectorAsJson(v, bindings));
+    } catch (IOException e) {
+      log.error("Error writing JSON as String.", e);
+    }
+    return fmtString;
+  }
+
+  /**
+   * Create a List of HashMaps containing vector terms and weights
+   *
+   * @return List<Object>
+   */
+  public static List<Object> formatVectorAsJson(Vector v, String[] bindings) throws IOException {
+
+    boolean hasBindings = bindings != null;
+    boolean isSparse = v.getNumNonZeroElements() != v.size();
+
+    // we assume sequential access in the output
+    Vector provider = v.isSequentialAccess() ? v : new SequentialAccessSparseVector(v);
+
+    List<Object> terms = new LinkedList<>();
+    String term = "";
+
+    for (Element elem : provider.nonZeroes()) {
+
+      if (hasBindings && bindings.length >= elem.index() + 1 && bindings[elem.index()] != null) {
+        term = bindings[elem.index()];
+      } else if (hasBindings || isSparse) {
+        term = String.valueOf(elem.index());
+      }
+
+      Map<String, Object> term_entry = new HashMap<>();
+      double roundedWeight = (double) Math.round(elem.get() * 1000) / 1000;
+      if (hasBindings || isSparse) {
+        term_entry.put(term, roundedWeight);
+        terms.add(term_entry);
+      } else {
+        terms.add(roundedWeight);
+      }
+    }
+
+    return terms;
+  }
+
+  @Override
+  public boolean isConverged() {
+    // Convergence has no meaning yet, perhaps in subclasses
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Cluster.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Cluster.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Cluster.java
new file mode 100644
index 0000000..07d6927
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Cluster.java
@@ -0,0 +1,90 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.common.parameters.Parametered;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.Map;
+
+/**
+ * Implementations of this interface have a printable representation and certain
+ * attributes that are common across all clustering implementations
+ * 
+ */
+public interface Cluster extends Model<VectorWritable>, Parametered {
+
+  // default directory for initial clusters to prime iterative clustering
+  // algorithms
+  String INITIAL_CLUSTERS_DIR = "clusters-0";
+  
+  // default directory for output of clusters per iteration
+  String CLUSTERS_DIR = "clusters-";
+  
+  // default suffix for output of clusters for final iteration
+  String FINAL_ITERATION_SUFFIX = "-final";
+  
+  /**
+   * Get the id of the Cluster
+   * 
+   * @return a unique integer
+   */
+  int getId();
+  
+  /**
+   * Get the "center" of the Cluster as a Vector
+   * 
+   * @return a Vector
+   */
+  Vector getCenter();
+  
+  /**
+   * Get the "radius" of the Cluster as a Vector. Usually the radius is the
+   * standard deviation expressed as a Vector of size equal to the center. Some
+   * clusters may return zero values if not appropriate.
+   * 
+   * @return aVector
+   */
+  Vector getRadius();
+    
+  /**
+   * Produce a custom, human-friendly, printable representation of the Cluster.
+   * 
+   * @param bindings
+   *          an optional String[] containing labels used to format the primary
+   *          Vector/s of this implementation.
+   * @return a String
+   */
+  String asFormatString(String[] bindings);
+
+  /**
+   * Produce a JSON representation of the Cluster.
+   *
+   * @param bindings
+   *          an optional String[] containing labels used to format the primary
+   *          Vector/s of this implementation.
+   * @return a Map
+   */
+  Map<String,Object> asJson(String[] bindings);
+
+  /**
+   * @return if the receiver has converged, or false if that has no meaning for
+   *         the implementation
+   */
+  boolean isConverged();
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java
new file mode 100644
index 0000000..ad0f8ec
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.WeightedVector;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.ProjectionSearch;
+import org.apache.mahout.math.neighborhood.Searcher;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.apache.mahout.math.random.WeightedThing;
+import org.apache.mahout.math.stats.OnlineSummarizer;
+
+public final class ClusteringUtils {
+  private ClusteringUtils() {
+  }
+
+  /**
+   * Computes the summaries for the distances in each cluster.
+   * @param datapoints iterable of datapoints.
+   * @param centroids iterable of Centroids.
+   * @return a list of OnlineSummarizers where the i-th element is the summarizer corresponding to the cluster whose
+   * index is i.
+   */
+  public static List<OnlineSummarizer> summarizeClusterDistances(Iterable<? extends Vector> datapoints,
+                                                                 Iterable<? extends Vector> centroids,
+                                                                 DistanceMeasure distanceMeasure) {
+    UpdatableSearcher searcher = new ProjectionSearch(distanceMeasure, 3, 1);
+    searcher.addAll(centroids);
+    List<OnlineSummarizer> summarizers = new ArrayList<>();
+    if (searcher.size() == 0) {
+      return summarizers;
+    }
+    for (int i = 0; i < searcher.size(); ++i) {
+      summarizers.add(new OnlineSummarizer());
+    }
+    for (Vector v : datapoints) {
+      Centroid closest = (Centroid)searcher.search(v,  1).get(0).getValue();
+      OnlineSummarizer summarizer = summarizers.get(closest.getIndex());
+      summarizer.add(distanceMeasure.distance(v, closest));
+    }
+    return summarizers;
+  }
+
+  /**
+   * Adds up the distances from each point to its closest cluster and returns the sum.
+   * @param datapoints iterable of datapoints.
+   * @param centroids iterable of Centroids.
+   * @return the total cost described above.
+   */
+  public static double totalClusterCost(Iterable<? extends Vector> datapoints, Iterable<? extends Vector> centroids) {
+    DistanceMeasure distanceMeasure = new EuclideanDistanceMeasure();
+    UpdatableSearcher searcher = new ProjectionSearch(distanceMeasure, 3, 1);
+    searcher.addAll(centroids);
+    return totalClusterCost(datapoints, searcher);
+  }
+
+  /**
+   * Adds up the distances from each point to its closest cluster and returns the sum.
+   * @param datapoints iterable of datapoints.
+   * @param centroids searcher of Centroids.
+   * @return the total cost described above.
+   */
+  public static double totalClusterCost(Iterable<? extends Vector> datapoints, Searcher centroids) {
+    double totalCost = 0;
+    for (Vector vector : datapoints) {
+      totalCost += centroids.searchFirst(vector, false).getWeight();
+    }
+    return totalCost;
+  }
+
+  /**
+   * Estimates the distance cutoff. In StreamingKMeans, the distance between two vectors divided
+   * by this value is used as a probability threshold when deciding whether to form a new cluster
+   * or not.
+   * Small values (comparable to the minimum distance between two points) are preferred as they
+   * guarantee with high likelihood that all but very close points are put in separate clusters
+   * initially. The clusters themselves are actually collapsed periodically when their number goes
+   * over the maximum number of clusters and the distanceCutoff is increased.
+   * So, the returned value is only an initial estimate.
+   * @param data the datapoints whose distance is to be estimated.
+   * @param distanceMeasure the distance measure used to compute the distance between two points.
+   * @return the minimum distance between the first sampleLimit points
+   * @see org.apache.mahout.clustering.streaming.cluster.StreamingKMeans#clusterInternal(Iterable, boolean)
+   */
+  public static double estimateDistanceCutoff(List<? extends Vector> data, DistanceMeasure distanceMeasure) {
+    BruteSearch searcher = new BruteSearch(distanceMeasure);
+    searcher.addAll(data);
+    double minDistance = Double.POSITIVE_INFINITY;
+    for (Vector vector : data) {
+      double closest = searcher.searchFirst(vector, true).getWeight();
+      if (minDistance > 0 && closest < minDistance) {
+        minDistance = closest;
+      }
+      searcher.add(vector);
+    }
+    return minDistance;
+  }
+
+  public static <T extends Vector> double estimateDistanceCutoff(
+      Iterable<T> data, DistanceMeasure distanceMeasure, int sampleLimit) {
+    return estimateDistanceCutoff(Lists.newArrayList(Iterables.limit(data, sampleLimit)), distanceMeasure);
+  }
+
+  /**
+   * Computes the Davies-Bouldin Index for a given clustering.
+   * See http://en.wikipedia.org/wiki/Clustering_algorithm#Internal_evaluation
+   * @param centroids list of centroids
+   * @param distanceMeasure distance measure for inter-cluster distances
+   * @param clusterDistanceSummaries summaries of the clusters; See summarizeClusterDistances
+   * @return the Davies-Bouldin Index
+   */
+  public static double daviesBouldinIndex(List<? extends Vector> centroids, DistanceMeasure distanceMeasure,
+                                          List<OnlineSummarizer> clusterDistanceSummaries) {
+    Preconditions.checkArgument(centroids.size() == clusterDistanceSummaries.size(),
+        "Number of centroids and cluster summaries differ.");
+    int n = centroids.size();
+    double totalDBIndex = 0;
+    // The inner loop shouldn't be reduced for j = i + 1 to n because the computation of the Davies-Bouldin
+    // index is not really symmetric.
+    // For a given cluster i, we look for a cluster j that maximizes the ratio of the sum of average distances
+    // from points in cluster i to its center and and points in cluster j to its center to the distance between
+    // cluster i and cluster j.
+    // The maximization is the key issue, as the cluster that maximizes this ratio might be j for i but is NOT
+    // NECESSARILY i for j.
+    for (int i = 0; i < n; ++i) {
+      double averageDistanceI = clusterDistanceSummaries.get(i).getMean();
+      double maxDBIndex = 0;
+      for (int j = 0; j < n; ++j) {
+        if (i != j) {
+          double dbIndex = (averageDistanceI + clusterDistanceSummaries.get(j).getMean())
+              / distanceMeasure.distance(centroids.get(i), centroids.get(j));
+          if (dbIndex > maxDBIndex) {
+            maxDBIndex = dbIndex;
+          }
+        }
+      }
+      totalDBIndex += maxDBIndex;
+    }
+    return totalDBIndex / n;
+  }
+
+  /**
+   * Computes the Dunn Index of a given clustering. See http://en.wikipedia.org/wiki/Dunn_index
+   * @param centroids list of centroids
+   * @param distanceMeasure distance measure to compute inter-centroid distance with
+   * @param clusterDistanceSummaries summaries of the clusters; See summarizeClusterDistances
+   * @return the Dunn Index
+   */
+  public static double dunnIndex(List<? extends Vector> centroids, DistanceMeasure distanceMeasure,
+                                 List<OnlineSummarizer> clusterDistanceSummaries) {
+    Preconditions.checkArgument(centroids.size() == clusterDistanceSummaries.size(),
+        "Number of centroids and cluster summaries differ.");
+    int n = centroids.size();
+    // Intra-cluster distances will come from the OnlineSummarizer, and will be the median distance (noting that
+    // the median for just one value is that value).
+    // A variety of metrics can be used for the intra-cluster distance including max distance between two points,
+    // mean distance, etc. Median distance was chosen as this is more robust to outliers and characterizes the
+    // distribution of distances (from a point to the center) better.
+    double maxIntraClusterDistance = 0;
+    for (OnlineSummarizer summarizer : clusterDistanceSummaries) {
+      if (summarizer.getCount() > 0) {
+        double intraClusterDistance;
+        if (summarizer.getCount() == 1) {
+          intraClusterDistance = summarizer.getMean();
+        } else {
+          intraClusterDistance = summarizer.getMedian();
+        }
+        if (maxIntraClusterDistance < intraClusterDistance) {
+          maxIntraClusterDistance = intraClusterDistance;
+        }
+      }
+    }
+    double minDunnIndex = Double.POSITIVE_INFINITY;
+    for (int i = 0; i < n; ++i) {
+      // Distances are symmetric, so d(i, j) = d(j, i).
+      for (int j = i + 1; j < n; ++j) {
+        double dunnIndex = distanceMeasure.distance(centroids.get(i), centroids.get(j));
+        if (minDunnIndex > dunnIndex) {
+          minDunnIndex = dunnIndex;
+        }
+      }
+    }
+    return minDunnIndex / maxIntraClusterDistance;
+  }
+
+  public static double choose2(double n) {
+    return n * (n - 1) / 2;
+  }
+
+  /**
+   * Creates a confusion matrix by searching for the closest cluster of both the row clustering and column clustering
+   * of a point and adding its weight to that cell of the matrix.
+   * It doesn't matter which clustering is the row clustering and which is the column clustering. If they're
+   * interchanged, the resulting matrix is the transpose of the original one.
+   * @param rowCentroids clustering one
+   * @param columnCentroids clustering two
+   * @param datapoints datapoints whose closest cluster we need to find
+   * @param distanceMeasure distance measure to use
+   * @return the confusion matrix
+   */
+  public static Matrix getConfusionMatrix(List<? extends Vector> rowCentroids, List<? extends  Vector> columnCentroids,
+                                          Iterable<? extends Vector> datapoints, DistanceMeasure distanceMeasure) {
+    Searcher rowSearcher = new BruteSearch(distanceMeasure);
+    rowSearcher.addAll(rowCentroids);
+    Searcher columnSearcher = new BruteSearch(distanceMeasure);
+    columnSearcher.addAll(columnCentroids);
+
+    int numRows = rowCentroids.size();
+    int numCols = columnCentroids.size();
+    Matrix confusionMatrix = new DenseMatrix(numRows, numCols);
+
+    for (Vector vector : datapoints) {
+      WeightedThing<Vector> closestRowCentroid = rowSearcher.search(vector, 1).get(0);
+      WeightedThing<Vector> closestColumnCentroid = columnSearcher.search(vector, 1).get(0);
+      int row = ((Centroid) closestRowCentroid.getValue()).getIndex();
+      int column = ((Centroid) closestColumnCentroid.getValue()).getIndex();
+      double vectorWeight;
+      if (vector instanceof WeightedVector) {
+        vectorWeight = ((WeightedVector) vector).getWeight();
+      } else {
+        vectorWeight = 1;
+      }
+      confusionMatrix.set(row, column, confusionMatrix.get(row, column) + vectorWeight);
+    }
+
+    return confusionMatrix;
+  }
+
+  /**
+   * Computes the Adjusted Rand Index for a given confusion matrix.
+   * @param confusionMatrix confusion matrix; not to be confused with the more restrictive ConfusionMatrix class
+   * @return the Adjusted Rand Index
+   */
+  public static double getAdjustedRandIndex(Matrix confusionMatrix) {
+    int numRows = confusionMatrix.numRows();
+    int numCols = confusionMatrix.numCols();
+    double rowChoiceSum = 0;
+    double columnChoiceSum = 0;
+    double totalChoiceSum = 0;
+    double total = 0;
+    for (int i = 0; i < numRows; ++i) {
+      double rowSum = 0;
+      for (int j = 0; j < numCols; ++j) {
+        rowSum += confusionMatrix.get(i, j);
+        totalChoiceSum += choose2(confusionMatrix.get(i, j));
+      }
+      total += rowSum;
+      rowChoiceSum += choose2(rowSum);
+    }
+    for (int j = 0; j < numCols; ++j) {
+      double columnSum = 0;
+      for (int i = 0; i < numRows; ++i) {
+        columnSum += confusionMatrix.get(i, j);
+      }
+      columnChoiceSum += choose2(columnSum);
+    }
+    double rowColumnChoiceSumDivTotal = rowChoiceSum * columnChoiceSum / choose2(total);
+    return (totalChoiceSum - rowColumnChoiceSumDivTotal)
+        / ((rowChoiceSum + columnChoiceSum) / 2 - rowColumnChoiceSumDivTotal);
+  }
+
+  /**
+   * Computes the total weight of the points in the given Vector iterable.
+   * @param data iterable of points
+   * @return total weight
+   */
+  public static double totalWeight(Iterable<? extends Vector> data) {
+    double sum = 0;
+    for (Vector row : data) {
+      Preconditions.checkNotNull(row);
+      if (row instanceof WeightedVector) {
+        sum += ((WeightedVector)row).getWeight();
+      } else {
+        sum++;
+      }
+    }
+    return sum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java
new file mode 100644
index 0000000..c25e039
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.math.Vector;
+
+public interface GaussianAccumulator {
+
+  /**
+   * @return the number of observations
+   */
+  double getN();
+
+  /**
+   * @return the mean of the observations
+   */
+  Vector getMean();
+
+  /**
+   * @return the std of the observations
+   */
+  Vector getStd();
+  
+  /**
+   * @return the average of the vector std elements
+   */
+  double getAverageStd();
+  
+  /**
+   * @return the variance of the observations
+   */
+  Vector getVariance();
+
+  /**
+   * Observe the vector 
+   * 
+   * @param x a Vector
+   * @param weight the double observation weight (usually 1.0)
+   */
+  void observe(Vector x, double weight);
+
+  /**
+   * Compute the mean, variance and standard deviation
+   */
+  void compute();
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Model.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Model.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Model.java
new file mode 100644
index 0000000..79dab30
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/Model.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * A model is a probability distribution over observed data points and allows
+ * the probability of any data point to be computed. All Models have a
+ * persistent representation and extend
+ * WritablesampleFromPosterior(Model<VectorWritable>[])
+ */
+public interface Model<O> extends Writable {
+  
+  /**
+   * Return the probability that the observation is described by this model
+   * 
+   * @param x
+   *          an Observation from the posterior
+   * @return the probability that x is in the receiver
+   */
+  double pdf(O x);
+  
+  /**
+   * Observe the given observation, retaining information about it
+   * 
+   * @param x
+   *          an Observation from the posterior
+   */
+  void observe(O x);
+  
+  /**
+   * Observe the given observation, retaining information about it
+   * 
+   * @param x
+   *          an Observation from the posterior
+   * @param weight
+   *          a double weighting factor
+   */
+  void observe(O x, double weight);
+  
+  /**
+   * Observe the given model, retaining information about its observations
+   * 
+   * @param x
+   *          a Model<0>
+   */
+  void observe(Model<O> x);
+  
+  /**
+   * Compute a new set of posterior parameters based upon the Observations that
+   * have been observed since my creation
+   */
+  void computeParameters();
+  
+  /**
+   * Return the number of observations that this model has seen since its
+   * parameters were last computed
+   * 
+   * @return a long
+   */
+  long getNumObservations();
+  
+  /**
+   * Return the number of observations that this model has seen over its
+   * lifetime
+   * 
+   * @return a long
+   */
+  long getTotalObservations();
+  
+  /**
+   * @return a sample of my posterior model
+   */
+  Model<VectorWritable> sampleFromPosterior();
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java
new file mode 100644
index 0000000..d77bf40
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+/** A model distribution allows us to sample a model from its prior distribution. */
+public interface ModelDistribution<O> {
+  
+  /**
+   * Return a list of models sampled from the prior
+   * 
+   * @param howMany
+   *          the int number of models to return
+   * @return a Model<Observation>[] representing what is known apriori
+   */
+  Model<O>[] sampleFromPrior(int howMany);
+  
+  /**
+   * Return a list of models sampled from the posterior
+   * 
+   * @param posterior
+   *          the Model<Observation>[] after observations
+   * @return a Model<Observation>[] representing what is known apriori
+   */
+  Model<O>[] sampleFromPosterior(Model<O>[] posterior);
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java
new file mode 100644
index 0000000..b76e00f
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.SquareRootFunction;
+
+/**
+ * An online Gaussian statistics accumulator based upon Knuth (who cites Welford) which is declared to be
+ * numerically-stable. See http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+ */
+public class OnlineGaussianAccumulator implements GaussianAccumulator {
+
+  private double sumWeight;
+  private Vector mean;
+  private Vector s;
+  private Vector variance;
+
+  @Override
+  public double getN() {
+    return sumWeight;
+  }
+
+  @Override
+  public Vector getMean() {
+    return mean;
+  }
+
+  @Override
+  public Vector getStd() {
+    return variance.clone().assign(new SquareRootFunction());
+  }
+
+  /* from Wikipedia: http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+   * 
+   * Weighted incremental algorithm
+   * 
+   * def weighted_incremental_variance(dataWeightPairs):
+   * mean = 0
+   * S = 0
+   * sumweight = 0
+   * for x, weight in dataWeightPairs: # Alternately "for x in zip(data, weight):"
+   *     temp = weight + sumweight
+   *     Q = x - mean
+   *      R = Q * weight / temp
+   *      S = S + sumweight * Q * R
+   *      mean = mean + R
+   *      sumweight = temp
+   *  Variance = S / (sumweight-1)  # if sample is the population, omit -1
+   *  return Variance
+   */
+  @Override
+  public void observe(Vector x, double weight) {
+    double temp = weight + sumWeight;
+    Vector q;
+    if (mean == null) {
+      mean = x.like();
+      q = x.clone();
+    } else {
+      q = x.minus(mean);
+    }
+    Vector r = q.times(weight).divide(temp);
+    if (s == null) {
+      s = q.times(sumWeight).times(r);
+    } else {
+      s = s.plus(q.times(sumWeight).times(r));
+    }
+    mean = mean.plus(r);
+    sumWeight = temp;
+    variance = s.divide(sumWeight - 1); //  # if sample is the population, omit -1
+  }
+
+  @Override
+  public void compute() {
+    // nothing to do here!
+  }
+
+  @Override
+  public double getAverageStd() {
+    if (sumWeight == 0.0) {
+      return 0.0;
+    } else {
+      Vector std = getStd();
+      return std.zSum() / std.size();
+    }
+  }
+
+  @Override
+  public Vector getVariance() {
+    return variance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java
new file mode 100644
index 0000000..138e830
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.SquareRootFunction;
+
+/**
+ * An online Gaussian accumulator that uses a running power sums approach as reported 
+ * on http://en.wikipedia.org/wiki/Standard_deviation
+ * Suffers from overflow, underflow and roundoff error but has minimal observe-time overhead
+ */
+public class RunningSumsGaussianAccumulator implements GaussianAccumulator {
+
+  private double s0;
+  private Vector s1;
+  private Vector s2;
+  private Vector mean;
+  private Vector std;
+
+  @Override
+  public double getN() {
+    return s0;
+  }
+
+  @Override
+  public Vector getMean() {
+    return mean;
+  }
+
+  @Override
+  public Vector getStd() {
+    return std;
+  }
+
+  @Override
+  public double getAverageStd() {
+    if (s0 == 0.0) {
+      return 0.0;
+    } else {
+      return std.zSum() / std.size();
+    }
+  }
+
+  @Override
+  public Vector getVariance() {
+    return std.times(std);
+  }
+
+  @Override
+  public void observe(Vector x, double weight) {
+    s0 += weight;
+    Vector weightedX = x.times(weight);
+    if (s1 == null) {
+      s1 = weightedX;
+    } else {
+      s1.assign(weightedX, Functions.PLUS);
+    }
+    Vector x2 = x.times(x).times(weight);
+    if (s2 == null) {
+      s2 = x2;
+    } else {
+      s2.assign(x2, Functions.PLUS);
+    }
+  }
+
+  @Override
+  public void compute() {
+    if (s0 != 0.0) {
+      mean = s1.divide(s0);
+      std = s2.times(s0).minus(s1.times(s1)).assign(new SquareRootFunction()).divide(s0);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java
new file mode 100644
index 0000000..ef43e1b
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.RealDistribution;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.RandomWrapper;
+
+public final class UncommonDistributions {
+
+  private static final RandomWrapper RANDOM = RandomUtils.getRandom();
+  
+  private UncommonDistributions() {}
+  
+  // =============== start of BSD licensed code. See LICENSE.txt
+  /**
+   * Returns a double sampled according to this distribution. Uniformly fast for all k > 0. (Reference:
+   * Non-Uniform Random Variate Generation, Devroye http://cgm.cs.mcgill.ca/~luc/rnbookindex.html) Uses
+   * Cheng's rejection algorithm (GB) for k>=1, rejection from Weibull distribution for 0 < k < 1.
+   */
+  public static double rGamma(double k, double lambda) {
+    boolean accept = false;
+    if (k >= 1.0) {
+      // Cheng's algorithm
+      double b = k - Math.log(4.0);
+      double c = k + Math.sqrt(2.0 * k - 1.0);
+      double lam = Math.sqrt(2.0 * k - 1.0);
+      double cheng = 1.0 + Math.log(4.5);
+      double x;
+      do {
+        double u = RANDOM.nextDouble();
+        double v = RANDOM.nextDouble();
+        double y = 1.0 / lam * Math.log(v / (1.0 - v));
+        x = k * Math.exp(y);
+        double z = u * v * v;
+        double r = b + c * y - x;
+        if (r >= 4.5 * z - cheng || r >= Math.log(z)) {
+          accept = true;
+        }
+      } while (!accept);
+      return x / lambda;
+    } else {
+      // Weibull algorithm
+      double c = 1.0 / k;
+      double d = (1.0 - k) * Math.pow(k, k / (1.0 - k));
+      double x;
+      do {
+        double u = RANDOM.nextDouble();
+        double v = RANDOM.nextDouble();
+        double z = -Math.log(u);
+        double e = -Math.log(v);
+        x = Math.pow(z, c);
+        if (z + e >= d + x) {
+          accept = true;
+        }
+      } while (!accept);
+      return x / lambda;
+    }
+  }
+  
+  // ============= end of BSD licensed code
+  
+  /**
+   * Returns a random sample from a beta distribution with the given shapes
+   * 
+   * @param shape1
+   *          a double representing shape1
+   * @param shape2
+   *          a double representing shape2
+   * @return a Vector of samples
+   */
+  public static double rBeta(double shape1, double shape2) {
+    double gam1 = rGamma(shape1, 1.0);
+    double gam2 = rGamma(shape2, 1.0);
+    return gam1 / (gam1 + gam2);
+    
+  }
+  
+  /**
+   * Return a random value from a normal distribution with the given mean and standard deviation
+   * 
+   * @param mean
+   *          a double mean value
+   * @param sd
+   *          a double standard deviation
+   * @return a double sample
+   */
+  public static double rNorm(double mean, double sd) {
+    RealDistribution dist = new NormalDistribution(RANDOM.getRandomGenerator(),
+                                                   mean,
+                                                   sd,
+                                                   NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
+    return dist.sample();
+  }
+  
+  /**
+   * Returns an integer sampled according to this distribution. Takes time proportional to np + 1. (Reference:
+   * Non-Uniform Random Variate Generation, Devroye http://cgm.cs.mcgill.ca/~luc/rnbookindex.html) Second
+   * time-waiting algorithm.
+   */
+  public static int rBinomial(int n, double p) {
+    if (p >= 1.0) {
+      return n; // needed to avoid infinite loops and negative results
+    }
+    double q = -Math.log1p(-p);
+    double sum = 0.0;
+    int x = 0;
+    while (sum <= q) {
+      double u = RANDOM.nextDouble();
+      double e = -Math.log(u);
+      sum += e / (n - x);
+      x++;
+    }
+    if (x == 0) {
+      return 0;
+    }
+    return x - 1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
new file mode 100644
index 0000000..930fd44
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.mahout.clustering.iterator.DistanceMeasureCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+
+/**
+ * This class models a canopy as a center point, the number of points that are contained within it according
+ * to the application of some distance metric, and a point total which is the sum of all the points and is
+ * used to compute the centroid when needed.
+ */
+@Deprecated
+public class Canopy extends DistanceMeasureCluster {
+  
+  /** Used for deserialization as a writable */
+  public Canopy() { }
+  
+  /**
+   * Create a new Canopy containing the given point and canopyId
+   * 
+   * @param center a point in vector space
+   * @param canopyId an int identifying the canopy local to this process only
+   * @param measure a DistanceMeasure to use
+   */
+  public Canopy(Vector center, int canopyId, DistanceMeasure measure) {
+    super(center, canopyId, measure);
+    observe(center);
+  }
+
+  public String asFormatString() {
+    return "C" + this.getId() + ": " + this.computeCentroid().asFormatString();
+  }
+
+  @Override
+  public String toString() {
+    return getIdentifier() + ": " + getCenter().asFormatString();
+  }
+  
+  @Override
+  public String getIdentifier() {
+    return "C-" + getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
new file mode 100644
index 0000000..3ce4757
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+@Deprecated
+public class CanopyClusterer {
+
+  private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class);
+
+  private int nextCanopyId;
+
+  // the T1 distance threshold
+  private double t1;
+
+  // the T2 distance threshold
+  private double t2;
+
+  // the T3 distance threshold
+  private double t3;
+
+  // the T4 distance threshold
+  private double t4;
+
+  // the distance measure
+  private DistanceMeasure measure;
+
+  public CanopyClusterer(DistanceMeasure measure, double t1, double t2) {
+    this.t1 = t1;
+    this.t2 = t2;
+    this.t3 = t1;
+    this.t4 = t2;
+    this.measure = measure;
+  }
+
+  public double getT1() {
+    return t1;
+  }
+
+  public double getT2() {
+    return t2;
+  }
+
+  public double getT3() {
+    return t3;
+  }
+
+  public double getT4() {
+    return t4;
+  }
+
+  /**
+   * Used by CanopyReducer to set t1=t3 and t2=t4 configuration values
+   */
+  public void useT3T4() {
+    t1 = t3;
+    t2 = t4;
+  }
+
+  /**
+   * This is the same algorithm as the reference but inverted to iterate over
+   * existing canopies instead of the points. Because of this it does not need
+   * to actually store the points, instead storing a total points vector and
+   * the number of points. From this a centroid can be computed.
+   * <p/>
+   * This method is used by the CanopyMapper, CanopyReducer and CanopyDriver.
+   * 
+   * @param point
+   *            the point to be added
+   * @param canopies
+   *            the List<Canopy> to be appended
+   */
+  public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
+    boolean pointStronglyBound = false;
+    for (Canopy canopy : canopies) {
+      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
+      if (dist < t1) {
+        if (log.isDebugEnabled()) {
+          log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier());
+        }
+        canopy.observe(point);
+      }
+      pointStronglyBound = pointStronglyBound || dist < t2;
+    }
+    if (!pointStronglyBound) {
+      if (log.isDebugEnabled()) {
+        log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null));
+      }
+      canopies.add(new Canopy(point, nextCanopyId++, measure));
+    }
+  }
+
+  /**
+   * Return if the point is covered by the canopy
+   * 
+   * @param point
+   *            a point
+   * @return if the point is covered
+   */
+  public boolean canopyCovers(Canopy canopy, Vector point) {
+    return measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point) < t1;
+  }
+
+  /**
+   * Iterate through the points, adding new canopies. Return the canopies.
+   * 
+   * @param points
+   *            a list<Vector> defining the points to be clustered
+   * @param measure
+   *            a DistanceMeasure to use
+   * @param t1
+   *            the T1 distance threshold
+   * @param t2
+   *            the T2 distance threshold
+   * @return the List<Canopy> created
+   */
+  public static List<Canopy> createCanopies(List<Vector> points,
+                                            DistanceMeasure measure,
+                                            double t1,
+                                            double t2) {
+    List<Canopy> canopies = Lists.newArrayList();
+    /**
+     * Reference Implementation: Given a distance metric, one can create
+     * canopies as follows: Start with a list of the data points in any
+     * order, and with two distance thresholds, T1 and T2, where T1 > T2.
+     * (These thresholds can be set by the user, or selected by
+     * cross-validation.) Pick a point on the list and measure its distance
+     * to all other points. Put all points that are within distance
+     * threshold T1 into a canopy. Remove from the list all points that are
+     * within distance threshold T2. Repeat until the list is empty.
+     */
+    int nextCanopyId = 0;
+    while (!points.isEmpty()) {
+      Iterator<Vector> ptIter = points.iterator();
+      Vector p1 = ptIter.next();
+      ptIter.remove();
+      Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
+      canopies.add(canopy);
+      while (ptIter.hasNext()) {
+        Vector p2 = ptIter.next();
+        double dist = measure.distance(p1, p2);
+        // Put all points that are within distance threshold T1 into the
+        // canopy
+        if (dist < t1) {
+          canopy.observe(p2);
+        }
+        // Remove from the list all points that are within distance
+        // threshold T2
+        if (dist < t2) {
+          ptIter.remove();
+        }
+      }
+      for (Canopy c : canopies) {
+        c.computeParameters();
+      }
+    }
+    return canopies;
+  }
+
+  /**
+   * Iterate through the canopies, adding their centroids to a list
+   * 
+   * @param canopies
+   *            a List<Canopy>
+   * @return the List<Vector>
+   */
+  public static List<Vector> getCenters(Iterable<Canopy> canopies) {
+    List<Vector> result = Lists.newArrayList();
+    for (Canopy canopy : canopies) {
+      result.add(canopy.getCenter());
+    }
+    return result;
+  }
+
+  /**
+   * Iterate through the canopies, resetting their center to their centroids
+   * 
+   * @param canopies
+   *            a List<Canopy>
+   */
+  public static void updateCentroids(Iterable<Canopy> canopies) {
+    for (Canopy canopy : canopies) {
+      canopy.computeParameters();
+    }
+  }
+
+  public void setT3(double t3) {
+    this.t3 = t3;
+  }
+
+  public void setT4(double t4) {
+    this.t4 = t4;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
new file mode 100644
index 0000000..2f24026
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+
+@Deprecated
+public final class CanopyConfigKeys {
+
+  private CanopyConfigKeys() {}
+
+  public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
+  public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+  public static final String T3_KEY = "org.apache.mahout.clustering.canopy.t3";
+
+  public static final String T4_KEY = "org.apache.mahout.clustering.canopy.t4";
+
+  // keys used by Driver, Mapper, Combiner & Reducer
+  public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+
+  public static final String CF_KEY = "org.apache.mahout.clustering.canopy.canopyFilter";
+
+  /**
+   * Create a {@link CanopyClusterer} from the Hadoop configuration.
+   *
+   * @param configuration Hadoop configuration
+   *
+   * @return CanopyClusterer
+   */
+  public static CanopyClusterer configureCanopyClusterer(Configuration configuration) {
+    double t1 = Double.parseDouble(configuration.get(T1_KEY));
+    double t2 = Double.parseDouble(configuration.get(T2_KEY));
+
+    DistanceMeasure measure = ClassUtils.instantiateAs(configuration.get(DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+    measure.configure(configuration);
+
+    CanopyClusterer canopyClusterer = new CanopyClusterer(measure, t1, t2);
+
+    String d = configuration.get(T3_KEY);
+    if (d != null) {
+      canopyClusterer.setT3(Double.parseDouble(d));
+    }
+
+    d = configuration.get(T4_KEY);
+    if (d != null) {
+      canopyClusterer.setT4(Double.parseDouble(d));
+    }
+    return canopyClusterer;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
new file mode 100644
index 0000000..06dc947
--- /dev/null
+++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.canopy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+@Deprecated
+public class CanopyDriver extends AbstractJob {
+
+  public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "clusteredPoints";
+
+  private static final Logger log = LoggerFactory.getLogger(CanopyDriver.class);
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new CanopyDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.t1Option().create());
+    addOption(DefaultOptionCreator.t2Option().create());
+    addOption(DefaultOptionCreator.t3Option().create());
+    addOption(DefaultOptionCreator.t4Option().create());
+    addOption(DefaultOptionCreator.clusterFilterOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.outlierThresholdOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    Configuration conf = getConf();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(conf, output);
+    }
+    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+    double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+    double t3 = t1;
+    if (hasOption(DefaultOptionCreator.T3_OPTION)) {
+      t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION));
+    }
+    double t4 = t2;
+    if (hasOption(DefaultOptionCreator.T4_OPTION)) {
+      t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
+    }
+    int clusterFilter = 0;
+    if (hasOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION)) {
+      clusterFilter = Integer
+          .parseInt(getOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION));
+    }
+    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
+        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+    double clusterClassificationThreshold = 0.0;
+    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
+      clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+    }
+    run(conf, input, output, measure, t1, t2, t3, t4, clusterFilter,
+        runClustering, clusterClassificationThreshold, runSequential);
+    return 0;
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input arguments and, if
+   * requested, cluster the input vectors using these clusters
+   * 
+   * @param conf
+   *          the Configuration
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the minimum canopy size output by the mappers
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param clusterClassificationThreshold 
+   *          vectors having pdf below this value will not be clustered. Its value should be between 0 and 1.
+   * @param runSequential
+   *          execute sequentially if true
+   */
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
+        t4, clusterFilter, runSequential);
+    if (runClustering) {
+      clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential);
+    }
+  }
+
+  /**
+   * Convenience method to provide backward compatibility
+   */
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, boolean runClustering,
+      double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    run(conf, input, output, measure, t1, t2, t1, t2, 0, runClustering,
+        clusterClassificationThreshold, runSequential);
+  }
+
+  /**
+   * Convenience method creates new Configuration() Build a directory of Canopy
+   * clusters from the input arguments and, if requested, cluster the input
+   * vectors using these clusters
+   * 
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param clusterClassificationThreshold
+   *          vectors having pdf below this value will not be clustered. Its value should be between 0 and 1. 
+   * @param runSequential
+   *          execute sequentially if true
+   */
+  public static void run(Path input, Path output, DistanceMeasure measure,
+      double t1, double t2, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    run(new Configuration(), input, output, measure, t1, t2, runClustering,
+        clusterClassificationThreshold, runSequential);
+  }
+
+  /**
+   * Convenience method for backwards compatibility
+   * 
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter,
+      boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    return buildClusters(conf, input, output, measure, t1, t2, t1, t2,
+        clusterFilter, runSequential);
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential or mapreduce execution as requested
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @param runSequential
+   *          a boolean indicates to run the sequential (reference) algorithm
+   * @return the canopy output directory Path
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      int clusterFilter, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
+             input, output, measure, t1, t2);
+    if (runSequential) {
+      return buildClustersSeq(input, output, measure, t1, t2, clusterFilter);
+    } else {
+      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4,
+          clusterFilter);
+    }
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential execution
+   * 
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @return the canopy output directory Path
+   */
+  private static Path buildClustersSeq(Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, int clusterFilter)
+    throws IOException {
+    CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
+    Collection<Canopy> canopies = Lists.newArrayList();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+
+    for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>(
+        input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+      clusterer.addPointToCanopies(vw.get(), canopies);
+    }
+
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX);
+    Path path = new Path(canopyOutputDir, "part-r-00000");
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, ClusterWritable.class);
+    try {
+      ClusterWritable clusterWritable = new ClusterWritable();
+      for (Canopy canopy : canopies) {
+        canopy.computeParameters();
+        if (log.isDebugEnabled()) {
+          log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
+                    canopy.getIdentifier(),
+                    AbstractCluster.formatVector(canopy.getCenter(), null),
+                    canopy.getNumObservations(),
+                    AbstractCluster.formatVector(canopy.getRadius(), null));
+        }
+        if (canopy.getNumObservations() > clusterFilter) {
+          clusterWritable.setValue(canopy);
+          writer.append(new Text(canopy.getIdentifier()), clusterWritable);
+        }
+      }
+    } finally {
+      Closeables.close(writer, false);
+    }
+    return canopyOutputDir;
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run mapreduce execution
+   * 
+   * @param conf
+   *          the Configuration
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param clusterFilter
+   *          the int minimum size of canopies produced
+   * @return the canopy output directory Path
+   */
+  private static Path buildClustersMR(Configuration conf, Path input,
+      Path output, DistanceMeasure measure, double t1, double t2, double t3,
+      double t4, int clusterFilter) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
+    conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
+    conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
+    conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
+    conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
+    conf.set(CanopyConfigKeys.CF_KEY, String.valueOf(clusterFilter));
+
+    Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
+        + input);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(CanopyMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setReducerClass(CanopyReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(ClusterWritable.class);
+    job.setNumReduceTasks(1);
+    job.setJarByClass(CanopyDriver.class);
+
+    FileInputFormat.addInputPath(job, input);
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX);
+    FileOutputFormat.setOutputPath(job, canopyOutputDir);
+    if (!job.waitForCompletion(true)) {
+      throw new InterruptedException("Canopy Job failed processing " + input);
+    }
+    return canopyOutputDir;
+  }
+
+  private static void clusterData(Configuration conf,
+                                  Path points,
+                                  Path canopies,
+                                  Path output,
+                                  double clusterClassificationThreshold,
+                                  boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies);
+    ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY),
+                                    clusterClassificationThreshold, true, runSequential);
+  }
+
+}