You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:54 UTC

[23/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
new file mode 100644
index 0000000..a8fa091
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.parameters.ClassParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.common.parameters.PathParameter;
+import org.apache.mahout.math.Algebra;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixWritable;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+//See http://en.wikipedia.org/wiki/Mahalanobis_distance for details
+public class MahalanobisDistanceMeasure implements DistanceMeasure {
+
+  private Matrix inverseCovarianceMatrix;
+  private Vector meanVector;
+
+  private ClassParameter vectorClass;
+  private ClassParameter matrixClass;
+  private List<Parameter<?>> parameters;
+  private Parameter<Path> inverseCovarianceFile;
+  private Parameter<Path> meanVectorFile;
+
+  /*public MahalanobisDistanceMeasure(Vector meanVector,Matrix inputMatrix, boolean inversionNeeded)
+  {
+    this.meanVector=meanVector;
+    if (inversionNeeded)
+      setCovarianceMatrix(inputMatrix);
+    else
+      setInverseCovarianceMatrix(inputMatrix);  
+  }*/
+
+  @Override
+  public void configure(Configuration jobConf) {
+    if (parameters == null) {
+      ParameteredGeneralizations.configureParameters(this, jobConf);
+    }
+    try {
+      if (inverseCovarianceFile.get() != null) {
+        FileSystem fs = FileSystem.get(inverseCovarianceFile.get().toUri(), jobConf);
+        MatrixWritable inverseCovarianceMatrix = 
+            ClassUtils.instantiateAs((Class<? extends MatrixWritable>) matrixClass.get(), MatrixWritable.class);
+        if (!fs.exists(inverseCovarianceFile.get())) {
+          throw new FileNotFoundException(inverseCovarianceFile.get().toString());
+        }
+        DataInputStream in = fs.open(inverseCovarianceFile.get());
+        try {
+          inverseCovarianceMatrix.readFields(in);
+        } finally {
+          Closeables.close(in, true);
+        }
+        this.inverseCovarianceMatrix = inverseCovarianceMatrix.get();
+        Preconditions.checkArgument(this.inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+      }
+
+      if (meanVectorFile.get() != null) {
+        FileSystem fs = FileSystem.get(meanVectorFile.get().toUri(), jobConf);
+        VectorWritable meanVector = 
+            ClassUtils.instantiateAs((Class<? extends VectorWritable>) vectorClass.get(), VectorWritable.class);
+        if (!fs.exists(meanVectorFile.get())) {
+          throw new FileNotFoundException(meanVectorFile.get().toString());
+        }
+        DataInputStream in = fs.open(meanVectorFile.get());
+        try {
+          meanVector.readFields(in);
+        } finally {
+          Closeables.close(in, true);
+        }
+        this.meanVector = meanVector.get();
+        Preconditions.checkArgument(this.meanVector != null, "meanVector not initialized");
+      }
+
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return parameters;
+  }
+
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+    parameters = Lists.newArrayList();
+    inverseCovarianceFile = new PathParameter(prefix, "inverseCovarianceFile", jobConf, null,
+            "Path on DFS to a file containing the inverse covariance matrix.");
+    parameters.add(inverseCovarianceFile);
+
+    matrixClass = new ClassParameter(prefix, "maxtrixClass", jobConf, DenseMatrix.class,
+            "Class<Matix> file specified in parameter inverseCovarianceFile has been serialized with.");
+    parameters.add(matrixClass);
+
+    meanVectorFile = new PathParameter(prefix, "meanVectorFile", jobConf, null,
+            "Path on DFS to a file containing the mean Vector.");
+    parameters.add(meanVectorFile);
+
+    vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class,
+            "Class file specified in parameter meanVectorFile has been serialized with.");
+    parameters.add(vectorClass);
+  }
+
+  /**
+   * @param v The vector to compute the distance to
+   * @return Mahalanobis distance of a multivariate vector
+   */
+  public double distance(Vector v) {
+    return Math.sqrt(v.minus(meanVector).dot(Algebra.mult(inverseCovarianceMatrix, v.minus(meanVector))));
+  }
+
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    if (v1.size() != v2.size()) {
+      throw new CardinalityException(v1.size(), v2.size());
+    }
+    return Math.sqrt(v1.minus(v2).dot(Algebra.mult(inverseCovarianceMatrix, v1.minus(v2))));
+  }
+
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO
+  }
+
+  public void setInverseCovarianceMatrix(Matrix inverseCovarianceMatrix) {
+    Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+    this.inverseCovarianceMatrix = inverseCovarianceMatrix;
+  }
+
+
+  /**
+   * Computes the inverse covariance from the input covariance matrix given in input.
+   *
+   * @param m A covariance matrix.
+   * @throws IllegalArgumentException if <tt>eigen values equal to 0 found</tt>.
+   */
+  public void setCovarianceMatrix(Matrix m) {
+    if (m.numRows() != m.numCols()) {
+      throw new CardinalityException(m.numRows(), m.numCols());
+    }
+    // See http://www.mlahanas.de/Math/svd.htm for details,
+    // which specifically details the case of covariance matrix inversion
+    // Complexity: O(min(nm2,mn2))
+    SingularValueDecomposition svd = new SingularValueDecomposition(m);
+    Matrix sInv = svd.getS();
+    // Inverse Diagonal Elems
+    for (int i = 0; i < sInv.numRows(); i++) {
+      double diagElem = sInv.get(i, i);
+      if (diagElem > 0.0) {
+        sInv.set(i, i, 1 / diagElem);
+      } else {
+        throw new IllegalStateException("Eigen Value equals to 0 found.");
+      }
+    }
+    inverseCovarianceMatrix = svd.getU().times(sInv.times(svd.getU().transpose()));
+    Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+  }
+
+  public Matrix getInverseCovarianceMatrix() {
+    return inverseCovarianceMatrix;
+  }
+
+  public void setMeanVector(Vector meanVector) {
+    Preconditions.checkArgument(meanVector != null, "meanVector not initialized");
+    this.meanVector = meanVector;
+  }
+
+  public Vector getMeanVector() {
+    return meanVector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
new file mode 100644
index 0000000..5c32fcf
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * This class implements a "manhattan distance" metric by summing the absolute values of the difference
+ * between each coordinate
+ */
+public class ManhattanDistanceMeasure implements DistanceMeasure {
+
+  public static double distance(double[] p1, double[] p2) {
+    double result = 0.0;
+    for (int i = 0; i < p1.length; i++) {
+      result += Math.abs(p2[i] - p1[i]);
+    }
+    return result;
+  }
+
+  @Override
+  public void configure(Configuration job) {
+  // nothing to do
+  }
+
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+  // nothing to do
+  }
+
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    if (v1.size() != v2.size()) {
+      throw new CardinalityException(v1.size(), v2.size());
+    }
+    return v1.aggregate(v2, Functions.PLUS, Functions.MINUS_ABS);
+  }
+
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
new file mode 100644
index 0000000..3a57f2f
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.DoubleParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/** 
+ * Implement Minkowski distance, a real-valued generalization of the 
+ * integral L(n) distances: Manhattan = L1, Euclidean = L2. 
+ * For high numbers of dimensions, very high exponents give more useful distances. 
+ * 
+ * Note: Math.pow is clever about integer-valued doubles.
+ **/
+public class MinkowskiDistanceMeasure implements DistanceMeasure {
+
+  private static final double EXPONENT = 3.0;
+
+  private List<Parameter<?>> parameters;
+  private double exponent = EXPONENT;
+  
+  public MinkowskiDistanceMeasure() {
+  }
+  
+  public MinkowskiDistanceMeasure(double exponent) {
+    this.exponent = exponent;
+  }
+
+  @Override
+  public void createParameters(String prefix, Configuration conf) {
+    parameters = Lists.newArrayList();
+    Parameter<?> param =
+        new DoubleParameter(prefix, "exponent", conf, EXPONENT, "Exponent for Fractional Lagrange distance");
+    parameters.add(param);
+  }
+
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return parameters;
+  }
+
+  @Override
+  public void configure(Configuration jobConf) {
+    if (parameters == null) {
+      ParameteredGeneralizations.configureParameters(this, jobConf);
+    }
+  }
+
+  public double getExponent() {
+    return exponent;
+  }
+
+  public void setExponent(double exponent) {
+    this.exponent = exponent;
+  }
+
+  /**
+   *  Math.pow is clever about integer-valued doubles
+   */
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    return Math.pow(v1.aggregate(v2, Functions.PLUS, Functions.minusAbsPow(exponent)), 1.0 / exponent);
+  }
+
+  // TODO: how?
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO - can this use centroidLengthSquare somehow?
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
new file mode 100644
index 0000000..66da121
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Like {@link EuclideanDistanceMeasure} but it does not take the square root.
+ * <p/>
+ * Thus, it is not actually the Euclidean Distance, but it is saves on computation when you only need the
+ * distance for comparison and don't care about the actual value as a distance.
+ */
+public class SquaredEuclideanDistanceMeasure implements DistanceMeasure {
+  
+  @Override
+  public void configure(Configuration job) {
+  // nothing to do
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+  // nothing to do
+  }
+  
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    return v2.getDistanceSquared(v1);
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return centroidLengthSquare - 2 * v.dot(centroid) + v.getLengthSquared();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
new file mode 100644
index 0000000..cfeb119
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * Tanimoto coefficient implementation.
+ * 
+ * http://en.wikipedia.org/wiki/Jaccard_index
+ */
+public class TanimotoDistanceMeasure extends WeightedDistanceMeasure {
+  
+  /**
+   * Calculates the distance between two vectors.
+   * 
+   * The coefficient (a measure of similarity) is: T(a, b) = a.b / (|a|^2 + |b|^2 - a.b)
+   * 
+   * The distance d(a,b) = 1 - T(a,b)
+   * 
+   * @return 0 for perfect match, > 0 for greater distance
+   */
+  @Override
+  public double distance(Vector a, Vector b) {
+    double ab;
+    double denominator;
+    if (getWeights() != null) {
+      ab = a.times(b).aggregate(getWeights(), Functions.PLUS, Functions.MULT);
+      denominator = a.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT)
+          + b.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT)
+          - ab;
+    } else {
+      ab = b.dot(a); // b is SequentialAccess
+      denominator = a.getLengthSquared() + b.getLengthSquared() - ab;
+    }
+    
+    if (denominator < ab) { // correct for fp round-off: distance >= 0
+      denominator = ab;
+    }
+    if (denominator > 0) {
+      // denominator == 0 only when dot(a,a) == dot(b,b) == dot(a,b) == 0
+      return 1.0 - ab / denominator;
+    } else {
+      return 0.0;
+    }
+  }
+
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
new file mode 100644
index 0000000..0c1d2cd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.parameters.ClassParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.common.parameters.PathParameter;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/** Abstract implementation of DistanceMeasure with support for weights. */
+public abstract class WeightedDistanceMeasure implements DistanceMeasure {
+  
+  private List<Parameter<?>> parameters;
+  private Parameter<Path> weightsFile;
+  private ClassParameter vectorClass;
+  private Vector weights;
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+    parameters = Lists.newArrayList();
+    weightsFile = new PathParameter(prefix, "weightsFile", jobConf, null,
+        "Path on DFS to a file containing the weights.");
+    parameters.add(weightsFile);
+    vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class,
+        "Class<Vector> file specified in parameter weightsFile has been serialized with.");
+    parameters.add(vectorClass);
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return parameters;
+  }
+  
+  @Override
+  public void configure(Configuration jobConf) {
+    if (parameters == null) {
+      ParameteredGeneralizations.configureParameters(this, jobConf);
+    }
+    try {
+      if (weightsFile.get() != null) {
+        FileSystem fs = FileSystem.get(weightsFile.get().toUri(), jobConf);
+        VectorWritable weights =
+            ClassUtils.instantiateAs((Class<? extends VectorWritable>) vectorClass.get(), VectorWritable.class);
+        if (!fs.exists(weightsFile.get())) {
+          throw new FileNotFoundException(weightsFile.get().toString());
+        }
+        DataInputStream in = fs.open(weightsFile.get());
+        try {
+          weights.readFields(in);
+        } finally {
+          Closeables.close(in, true);
+        }
+        this.weights = weights.get();
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+  public Vector getWeights() {
+    return weights;
+  }
+  
+  public void setWeights(Vector weights) {
+    this.weights = weights;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
new file mode 100644
index 0000000..c6889e2
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * This class implements a Euclidean distance metric by summing the square root of the squared differences
+ * between each coordinate, optionally adding weights.
+ */
+public class WeightedEuclideanDistanceMeasure extends WeightedDistanceMeasure {
+  
+  @Override
+  public double distance(Vector p1, Vector p2) {
+    double result = 0;
+    Vector res = p2.minus(p1);
+    Vector theWeights = getWeights();
+    if (theWeights == null) {
+      for (Element elt : res.nonZeroes()) {
+        result += elt.get() * elt.get();
+      }
+    } else {
+      for (Element elt : res.nonZeroes()) {
+        result += elt.get() * elt.get() * theWeights.get(elt.index());
+      }
+    }
+    return Math.sqrt(result);
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
new file mode 100644
index 0000000..2c280e2
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * This class implements a "Manhattan distance" metric by summing the absolute values of the difference
+ * between each coordinate, optionally with weights.
+ */
+public class WeightedManhattanDistanceMeasure extends WeightedDistanceMeasure {
+  
+  @Override
+  public double distance(Vector p1, Vector p2) {
+    double result = 0;
+    
+    Vector res = p2.minus(p1);
+    if (getWeights() == null) {
+      for (Element elt : res.nonZeroes()) {
+        result += Math.abs(elt.get());
+      }
+      
+    } else {
+      for (Element elt : res.nonZeroes()) {
+        result += Math.abs(elt.get() * getWeights().get(elt.index()));
+      }
+    }
+    
+    return result;
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+    return distance(centroid, v); // TODO
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
new file mode 100644
index 0000000..73cc821
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+
+/**
+ * An iterator that copies the values in an underlying iterator by finding an appropriate copy constructor.
+ */
+public final class CopyConstructorIterator<T> extends ForwardingIterator<T> {
+
+  private final Iterator<T> delegate;
+  private Constructor<T> constructor;
+
+  public CopyConstructorIterator(Iterator<? extends T> copyFrom) {
+    this.delegate = Iterators.transform(
+        copyFrom,
+        new Function<T,T>() {
+          @Override
+          public T apply(T from) {
+            if (constructor == null) {
+              Class<T> elementClass = (Class<T>) from.getClass();
+              try {
+                constructor = elementClass.getConstructor(elementClass);
+              } catch (NoSuchMethodException e) {
+                throw new IllegalStateException(e);
+              }
+            }
+            try {
+              return constructor.newInstance(from);
+            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+              throw new IllegalStateException(e);
+            }
+          }
+        });
+  }
+
+  @Override
+  protected Iterator<T> delegate() {
+    return delegate;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
new file mode 100644
index 0000000..658c1f1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * Iterates over the integers from 0 through {@code to-1}.
+ */
+public final class CountingIterator extends AbstractIterator<Integer> {
+
+  private int count;
+  private final int to;
+
+  public CountingIterator(int to) {
+    this.to = to;
+  }
+
+  @Override
+  protected Integer computeNext() {
+    if (count < to) {
+      return count++;
+    } else {
+      return endOfData();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
new file mode 100644
index 0000000..cfc18d6
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Iterable representing the lines of a text file. It can produce an {@link Iterator} over those lines. This
+ * assumes the text file's lines are delimited in a manner consistent with how {@link java.io.BufferedReader}
+ * defines lines.
+ *
+ */
+public final class FileLineIterable implements Iterable<String> {
+
+  private final InputStream is;
+  private final Charset encoding;
+  private final boolean skipFirstLine;
+  private final String origFilename;
+  
+  /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */
+  public FileLineIterable(File file) throws IOException {
+    this(file, Charsets.UTF_8, false);
+  }
+
+  /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */
+  public FileLineIterable(File file, boolean skipFirstLine) throws IOException {
+    this(file, Charsets.UTF_8, skipFirstLine);
+  }
+  
+  /** Creates a {@link FileLineIterable} over a given file, using the given encoding. */
+  public FileLineIterable(File file, Charset encoding, boolean skipFirstLine) throws IOException {
+    this(FileLineIterator.getFileInputStream(file), encoding, skipFirstLine);
+  }
+
+  public FileLineIterable(InputStream is) {
+    this(is, Charsets.UTF_8, false);
+  }
+  
+  public FileLineIterable(InputStream is, boolean skipFirstLine) {
+    this(is, Charsets.UTF_8, skipFirstLine);
+  }
+  
+  public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine) {
+    this.is = is;
+    this.encoding = encoding;
+    this.skipFirstLine = skipFirstLine;
+    this.origFilename = "";
+  }
+
+  public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine, String filename) {    
+    this.is = is;
+    this.encoding = encoding;
+    this.skipFirstLine = skipFirstLine;
+    this.origFilename = filename;
+  }
+  
+  
+  @Override
+  public Iterator<String> iterator() {
+    try {
+      return new FileLineIterator(is, encoding, skipFirstLine, this.origFilename);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
new file mode 100644
index 0000000..b7cc51e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipInputStream;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iterates over the lines of a text file. This assumes the text file's lines are delimited in a manner
+ * consistent with how {@link BufferedReader} defines lines.
+ * <p/>
+ * This class will uncompress files that end in .zip or .gz accordingly, too.
+ */
+public final class FileLineIterator extends AbstractIterator<String> implements SkippingIterator<String>, Closeable {
+
+  private final BufferedReader reader;
+
+  private static final Logger log = LoggerFactory.getLogger(FileLineIterator.class);
+
+  /**
+   * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding.
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
+   * @throws IOException
+   *           if the file cannot be read
+   */
+  public FileLineIterator(File file) throws IOException {
+    this(file, Charsets.UTF_8, false);
+  }
+
+  /**
+   * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding.
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
+   * @throws IOException                   if the file cannot be read
+   */
+  public FileLineIterator(File file, boolean skipFirstLine) throws IOException {
+    this(file, Charsets.UTF_8, skipFirstLine);
+  }
+
+  /**
+   * Creates a {@link FileLineIterator} over a given file, using the given encoding.
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
+   * @throws IOException                   if the file cannot be read
+   */
+  public FileLineIterator(File file, Charset encoding, boolean skipFirstLine) throws IOException {
+    this(getFileInputStream(file), encoding, skipFirstLine);
+  }
+
+  public FileLineIterator(InputStream is) throws IOException {
+    this(is, Charsets.UTF_8, false);
+  }
+
+  public FileLineIterator(InputStream is, boolean skipFirstLine) throws IOException {
+    this(is, Charsets.UTF_8, skipFirstLine);
+  }
+
+  public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine) throws IOException {
+    reader = new BufferedReader(new InputStreamReader(is, encoding));
+    if (skipFirstLine) {
+      reader.readLine();
+    }
+  }
+
+  public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine, String filename)
+    throws IOException {
+    InputStream compressedInputStream;
+
+    if ("gz".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+      compressedInputStream = new GZIPInputStream(is);
+    } else if ("zip".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+      compressedInputStream = new ZipInputStream(is);
+    } else {
+      compressedInputStream = is;
+    }
+
+    reader = new BufferedReader(new InputStreamReader(compressedInputStream, encoding));
+    if (skipFirstLine) {
+      reader.readLine();
+    }
+  }
+
+  static InputStream getFileInputStream(File file) throws IOException {
+    InputStream is = new FileInputStream(file);
+    String name = file.getName();
+    if ("gz".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
+      return new GZIPInputStream(is);
+    } else if ("zip".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
+      return new ZipInputStream(is);
+    } else {
+      return is;
+    }
+  }
+
+  @Override
+  protected String computeNext() {
+    String line;
+    try {
+      line = reader.readLine();
+    } catch (IOException ioe) {
+      try {
+        close();
+      } catch (IOException e) {
+        log.error(e.getMessage(), e);
+      }
+      throw new IllegalStateException(ioe);
+    }
+    return line == null ? endOfData() : line;
+  }
+
+
+  @Override
+  public void skip(int n) {
+    try {
+      for (int i = 0; i < n; i++) {
+        if (reader.readLine() == null) {
+          break;
+        }
+      }
+    } catch (IOException ioe) {
+      try {
+        close();
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    endOfData();
+    Closeables.close(reader, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
new file mode 100644
index 0000000..1905654
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * Sample a fixed number of elements from an Iterator. The results can appear in any order.
+ */
+public final class FixedSizeSamplingIterator<T> extends ForwardingIterator<T> {
+
+  private final Iterator<T> delegate;
+  
+  public FixedSizeSamplingIterator(int size, Iterator<T> source) {
+    List<T> buf = Lists.newArrayListWithCapacity(size);
+    int sofar = 0;
+    Random random = RandomUtils.getRandom();
+    while (source.hasNext()) {
+      T v = source.next();
+      sofar++;
+      if (buf.size() < size) {
+        buf.add(v);
+      } else {
+        int position = random.nextInt(sofar);
+        if (position < buf.size()) {
+          buf.set(position, v);
+        }
+      }
+    }
+    delegate = buf.iterator();
+  }
+
+  @Override
+  protected Iterator<T> delegate() {
+    return delegate;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
new file mode 100644
index 0000000..46ef411
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+
+/**
+ * Wraps an {@link Iterable} whose {@link Iterable#iterator()} returns only some subset of the elements that
+ * it would, as determined by a iterator rate parameter.
+ */
+public final class SamplingIterable<T> implements Iterable<T> {
+  
+  private final Iterable<? extends T> delegate;
+  private final double samplingRate;
+  
+  public SamplingIterable(Iterable<? extends T> delegate, double samplingRate) {
+    this.delegate = delegate;
+    this.samplingRate = samplingRate;
+  }
+  
+  @Override
+  public Iterator<T> iterator() {
+    return new SamplingIterator<T>(delegate.iterator(), samplingRate);
+  }
+  
+  public static <T> Iterable<T> maybeWrapIterable(Iterable<T> delegate, double samplingRate) {
+    return samplingRate >= 1.0 ? delegate : new SamplingIterable<T>(delegate, samplingRate);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
new file mode 100644
index 0000000..2ba46fd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import org.apache.commons.math3.distribution.PascalDistribution;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.RandomWrapper;
+
+/**
+ * Wraps an {@link Iterator} and returns only some subset of the elements that it would, as determined by a
+ * iterator rate parameter.
+ */
+public final class SamplingIterator<T> extends AbstractIterator<T> {
+  
+  private final PascalDistribution geometricDistribution;
+  private final Iterator<? extends T> delegate;
+
+  public SamplingIterator(Iterator<? extends T> delegate, double samplingRate) {
+    this(RandomUtils.getRandom(), delegate, samplingRate);
+  }
+
+  public SamplingIterator(RandomWrapper random, Iterator<? extends T> delegate, double samplingRate) {
+    Preconditions.checkNotNull(delegate);
+    Preconditions.checkArgument(samplingRate > 0.0 && samplingRate <= 1.0,
+        "Must be: 0.0 < samplingRate <= 1.0. But samplingRate = " + samplingRate);
+    // Geometric distribution is special case of negative binomial (aka Pascal) with r=1:
+    geometricDistribution = new PascalDistribution(random.getRandomGenerator(), 1, samplingRate);
+    this.delegate = delegate;
+  }
+
+  @Override
+  protected T computeNext() {
+    int toSkip = geometricDistribution.sample();
+    if (delegate instanceof SkippingIterator<?>) {
+      SkippingIterator<? extends T> skippingDelegate = (SkippingIterator<? extends T>) delegate;
+      skippingDelegate.skip(toSkip);
+      if (skippingDelegate.hasNext()) {
+        return skippingDelegate.next();
+      }
+    } else {
+      for (int i = 0; i < toSkip && delegate.hasNext(); i++) {
+        delegate.next();
+      }
+      if (delegate.hasNext()) {
+        return delegate.next();
+      }
+    }
+    return endOfData();
+  }
+
+
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
new file mode 100644
index 0000000..c4ddf7b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * Sample a fixed number of elements from an Iterator. The results will appear in the original order at some
+ * cost in time and memory relative to a FixedSizeSampler.
+ */
+public class StableFixedSizeSamplingIterator<T> extends ForwardingIterator<T> {
+
+  private final Iterator<T> delegate;
+  
+  public StableFixedSizeSamplingIterator(int size, Iterator<T> source) {
+    List<Pair<Integer,T>> buf = Lists.newArrayListWithCapacity(size);
+    int sofar = 0;
+    Random random = RandomUtils.getRandom();
+    while (source.hasNext()) {
+      T v = source.next();
+      sofar++;
+      if (buf.size() < size) {
+        buf.add(new Pair<>(sofar, v));
+      } else {
+        int position = random.nextInt(sofar);
+        if (position < buf.size()) {
+          buf.set(position, new Pair<>(sofar, v));
+        }
+      }
+    }
+
+    Collections.sort(buf);
+    delegate = Iterators.transform(buf.iterator(),
+      new Function<Pair<Integer,T>,T>() {
+        @Override
+        public T apply(Pair<Integer,T> from) {
+          return from.getSecond();
+        }
+      });
+  }
+
+  @Override
+  protected Iterator<T> delegate() {
+    return delegate;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
new file mode 100644
index 0000000..73b841e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import org.apache.mahout.common.Pair;
+
+public class StringRecordIterator extends ForwardingIterator<Pair<List<String>,Long>> {
+  
+  private static final Long ONE = 1L;
+
+  private final Pattern splitter;
+  private final Iterator<Pair<List<String>,Long>> delegate;
+
+  public StringRecordIterator(Iterable<String> stringIterator, String pattern) {
+    this.splitter = Pattern.compile(pattern);
+    delegate = Iterators.transform(
+        stringIterator.iterator(),
+        new Function<String,Pair<List<String>,Long>>() {
+          @Override
+          public Pair<List<String>,Long> apply(String from) {
+            String[] items = splitter.split(from);
+            return new Pair<>(Arrays.asList(items), ONE);
+          }
+        });
+  }
+
+  @Override
+  protected Iterator<Pair<List<String>,Long>> delegate() {
+    return delegate;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
new file mode 100644
index 0000000..19f78b5
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Supplies some useful and repeatedly-used instances of {@link PathFilter}.
+ */
+public final class PathFilters {
+
+  private static final PathFilter PART_FILE_INSTANCE = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith("part-") && !name.endsWith(".crc");
+    }
+  };
+  
+  /**
+   * Pathfilter to read the final clustering file.
+   */
+  private static final PathFilter CLUSTER_FINAL = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith("clusters-") && name.endsWith("-final");
+    }
+  };
+
+  private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_"));
+    }
+  };
+
+  private PathFilters() {
+  }
+
+  /**
+   * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes
+   * ".crc" files.
+   */
+  public static PathFilter partFilter() {
+    return PART_FILE_INSTANCE;
+  }
+  
+  /**
+   * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final".
+   */
+  public static PathFilter finalPartFilter() {
+    return CLUSTER_FINAL;
+  }
+
+  /**
+   * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera
+   * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc"
+   */
+  public static PathFilter logsCRCFilter() {
+    return LOGS_CRC_INSTANCE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
new file mode 100644
index 0000000..7ea713e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+/**
+ * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a
+ * directory to list, or a glob pattern.
+ */
+public enum PathType {
+  GLOB,
+  LIST,
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
new file mode 100644
index 0000000..ca4d6b8
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirIterator}.</p>
+ */
+public final class SequenceFileDirIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+  private final Path path;
+  private final PathType pathType;
+  private final PathFilter filter;
+  private final Comparator<FileStatus> ordering;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) {
+    this(path, pathType, null, conf);
+  }
+
+  public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+    this(path, pathType, filter, null, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+   *  glob pattern ({@link PathType#GLOB})
+   * @param filter if not null, specifies sequence files to be ignored by the iteration
+   * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+   * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileDirIterable(Path path,
+                                 PathType pathType,
+                                 PathFilter filter,
+                                 Comparator<FileStatus> ordering,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) {
+    this.path = path;
+    this.pathType = pathType;
+    this.filter = filter;
+    this.ordering = ordering;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<Pair<K,V>> iterator() {
+    try {
+      return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
new file mode 100644
index 0000000..cf6a871
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+
+/**
+ * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path
+ * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
+    extends ForwardingIterator<Pair<K,V>> implements Closeable {
+
+  private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+  private Iterator<Pair<K,V>> delegate;
+  private final List<SequenceFileIterator<K,V>> iterators;
+
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   */
+  public SequenceFileDirIterator(Path[] path,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) throws IOException {
+
+    iterators = Lists.newArrayList();
+    // we assume all files should exist, otherwise we will bail out.
+    FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++) {
+      statuses[i] = fs.getFileStatus(path[i]);
+    }
+    init(statuses, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+   * (depending on pathType parameter).
+   */
+  public SequenceFileDirIterator(Path path,
+                                 PathType pathType,
+                                 PathFilter filter,
+                                 Comparator<FileStatus> ordering,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) throws IOException {
+
+    FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
+    iterators = Lists.newArrayList();
+    init(statuses, reuseKeyValueInstances, conf);
+  }
+
+  private void init(FileStatus[] statuses,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null) {
+      statuses = NO_STATUSES;
+    }
+
+    Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    Iterator<Iterator<Pair<K, V>>> fsIterators =
+      Iterators.transform(fileStatusIterator,
+        new Function<FileStatus, Iterator<Pair<K, V>>>() {
+          @Override
+          public Iterator<Pair<K, V>> apply(FileStatus from) {
+            try {
+              SequenceFileIterator<K, V> iterator = new SequenceFileIterator<>(from.getPath(),
+                  reuseKeyValueInstances, conf);
+              iterators.add(iterator);
+              return iterator;
+            } catch (IOException ioe) {
+              throw new IllegalStateException(from.getPath().toString(), ioe);
+            }
+          }
+        });
+
+    Collections.reverse(iterators); // close later in reverse order
+
+    delegate = Iterators.concat(fsIterators);
+  }
+
+  @Override
+  protected Iterator<Pair<K,V>> delegate() {
+    return delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+    iterators.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
new file mode 100644
index 0000000..1cb4ebc
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.</p>
+ */
+public final class SequenceFileDirValueIterable<V extends Writable> implements Iterable<V> {
+
+  private final Path path;
+  private final PathType pathType;
+  private final PathFilter filter;
+  private final Comparator<FileStatus> ordering;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) {
+    this(path, pathType, null, conf);
+  }
+
+  public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+    this(path, pathType, filter, null, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+   *  glob pattern ({@link PathType#GLOB})
+   * @param filter if not null, specifies sequence files to be ignored by the iteration
+   * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+   * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileDirValueIterable(Path path,
+                                      PathType pathType,
+                                      PathFilter filter,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) {
+    this.path = path;
+    this.pathType = pathType;
+    this.filter = filter;
+    this.ordering = ordering;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<V> iterator() {
+    try {
+      return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
new file mode 100644
index 0000000..908c8bb
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+
+/**
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+    ForwardingIterator<V> implements Closeable {
+  
+  private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+  private Iterator<V> delegate;
+  private final List<SequenceFileValueIterator<V>> iterators;
+
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+   * (depending on pathType parameter).
+   */
+  public SequenceFileDirValueIterator(Path path,
+                                      PathType pathType,
+                                      PathFilter filter,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) throws IOException {
+    FileStatus[] statuses;
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    if (filter == null) {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+    } else {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+    }
+    iterators = Lists.newArrayList();
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   */
+  public SequenceFileDirValueIterator(Path[] path,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) throws IOException {
+
+    iterators = Lists.newArrayList();
+    /*
+     * we assume all files should exist, otherwise we will bail out.
+     */
+    FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++) {
+      statuses[i] = fs.getFileStatus(path[i]);
+    }
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  private void init(FileStatus[] statuses,
+                    Comparator<FileStatus> ordering,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) throws IOException {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null) {
+      statuses = NO_STATUSES;
+    }
+
+    if (ordering != null) {
+      Arrays.sort(statuses, ordering);
+    }
+    Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    try {
+
+      Iterator<Iterator<V>> fsIterators =
+        Iterators.transform(fileStatusIterator,
+          new Function<FileStatus, Iterator<V>>() {
+            @Override
+            public Iterator<V> apply(FileStatus from) {
+              try {
+                SequenceFileValueIterator<V> iterator = new SequenceFileValueIterator<>(from.getPath(),
+                    reuseKeyValueInstances, conf);
+                iterators.add(iterator);
+                return iterator;
+              } catch (IOException ioe) {
+                throw new IllegalStateException(from.getPath().toString(), ioe);
+              }
+            }
+          });
+
+      Collections.reverse(iterators); // close later in reverse order
+
+      delegate = Iterators.concat(fsIterators);
+
+    } finally {
+      /*
+       * prevent file handle leaks in case one of handles fails to open. If some
+       * of the files fail to open, constructor will fail and close() will never
+       * be called. Thus, those handles that did open in constructor, would leak
+       * out, unless we specifically handle it here.
+       */
+      IOUtils.close(iterators);
+    }
+  }
+
+  @Override
+  protected Iterator<V> delegate() {
+    return delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
new file mode 100644
index 0000000..f17c2a1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileIterator}.</p>
+ */
+public final class SequenceFileIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+  private final Path path;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  /**
+   * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused
+   * by default.
+   *
+   * @param path file to iterate over
+   */
+  public SequenceFileIterable(Path path, Configuration conf) {
+    this(path, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+    this.path = path;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    try {
+      return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
new file mode 100644
index 0000000..bc5c549
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair}
+ * containing key and value.</p>
+ */
+public final class SequenceFileIterator<K extends Writable,V extends Writable>
+  extends AbstractIterator<Pair<K,V>> implements Closeable {
+
+  private final SequenceFile.Reader reader;
+  private final Configuration conf;
+  private final Class<K> keyClass;
+  private final Class<V> valueClass;
+  private final boolean noValue;
+  private K key;
+  private V value;
+  private final boolean reuseKeyValueInstances;
+
+  private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class);
+
+      /**
+       * @throws IOException if path can't be read, or its key or value class can't be instantiated
+       */
+
+  public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+    key = null;
+    value = null;
+    FileSystem fs = path.getFileSystem(conf);
+    path = path.makeQualified(fs);
+    reader = new SequenceFile.Reader(fs, path, conf);
+    this.conf = conf;
+    keyClass = (Class<K>) reader.getKeyClass();
+    valueClass = (Class<V>) reader.getValueClass();
+    noValue = NullWritable.class.equals(valueClass);
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+  }
+
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public void close() throws IOException {
+    key = null;
+    value = null;
+    Closeables.close(reader, true);
+
+    endOfData();
+  }
+
+  @Override
+  protected Pair<K,V> computeNext() {
+    if (!reuseKeyValueInstances || value == null) {
+      key = ReflectionUtils.newInstance(keyClass, conf);
+      if (!noValue) {
+        value = ReflectionUtils.newInstance(valueClass, conf);
+      }
+    }
+    try {
+      boolean available;
+      if (noValue) {
+        available = reader.next(key);
+      } else {
+        available = reader.next(key, value);
+      }
+      if (!available) {
+        close();
+        return null;
+      }
+      return new Pair<>(key, value);
+    } catch (IOException ioe) {
+      try {
+        close();
+      } catch (IOException e) {
+        log.error(e.getMessage(), e);
+      }
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+}