You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:54 UTC
[23/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
new file mode 100644
index 0000000..a8fa091
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.parameters.ClassParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.common.parameters.PathParameter;
+import org.apache.mahout.math.Algebra;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixWritable;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+//See http://en.wikipedia.org/wiki/Mahalanobis_distance for details
+public class MahalanobisDistanceMeasure implements DistanceMeasure {
+
+ private Matrix inverseCovarianceMatrix;
+ private Vector meanVector;
+
+ private ClassParameter vectorClass;
+ private ClassParameter matrixClass;
+ private List<Parameter<?>> parameters;
+ private Parameter<Path> inverseCovarianceFile;
+ private Parameter<Path> meanVectorFile;
+
+ /*public MahalanobisDistanceMeasure(Vector meanVector,Matrix inputMatrix, boolean inversionNeeded)
+ {
+ this.meanVector=meanVector;
+ if (inversionNeeded)
+ setCovarianceMatrix(inputMatrix);
+ else
+ setInverseCovarianceMatrix(inputMatrix);
+ }*/
+
+ @Override
+ public void configure(Configuration jobConf) {
+ if (parameters == null) {
+ ParameteredGeneralizations.configureParameters(this, jobConf);
+ }
+ try {
+ if (inverseCovarianceFile.get() != null) {
+ FileSystem fs = FileSystem.get(inverseCovarianceFile.get().toUri(), jobConf);
+ MatrixWritable inverseCovarianceMatrix =
+ ClassUtils.instantiateAs((Class<? extends MatrixWritable>) matrixClass.get(), MatrixWritable.class);
+ if (!fs.exists(inverseCovarianceFile.get())) {
+ throw new FileNotFoundException(inverseCovarianceFile.get().toString());
+ }
+ DataInputStream in = fs.open(inverseCovarianceFile.get());
+ try {
+ inverseCovarianceMatrix.readFields(in);
+ } finally {
+ Closeables.close(in, true);
+ }
+ this.inverseCovarianceMatrix = inverseCovarianceMatrix.get();
+ Preconditions.checkArgument(this.inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+ }
+
+ if (meanVectorFile.get() != null) {
+ FileSystem fs = FileSystem.get(meanVectorFile.get().toUri(), jobConf);
+ VectorWritable meanVector =
+ ClassUtils.instantiateAs((Class<? extends VectorWritable>) vectorClass.get(), VectorWritable.class);
+ if (!fs.exists(meanVectorFile.get())) {
+ throw new FileNotFoundException(meanVectorFile.get().toString());
+ }
+ DataInputStream in = fs.open(meanVectorFile.get());
+ try {
+ meanVector.readFields(in);
+ } finally {
+ Closeables.close(in, true);
+ }
+ this.meanVector = meanVector.get();
+ Preconditions.checkArgument(this.meanVector != null, "meanVector not initialized");
+ }
+
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return parameters;
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) {
+ parameters = Lists.newArrayList();
+ inverseCovarianceFile = new PathParameter(prefix, "inverseCovarianceFile", jobConf, null,
+ "Path on DFS to a file containing the inverse covariance matrix.");
+ parameters.add(inverseCovarianceFile);
+
+ matrixClass = new ClassParameter(prefix, "maxtrixClass", jobConf, DenseMatrix.class,
+ "Class<Matix> file specified in parameter inverseCovarianceFile has been serialized with.");
+ parameters.add(matrixClass);
+
+ meanVectorFile = new PathParameter(prefix, "meanVectorFile", jobConf, null,
+ "Path on DFS to a file containing the mean Vector.");
+ parameters.add(meanVectorFile);
+
+ vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class,
+ "Class file specified in parameter meanVectorFile has been serialized with.");
+ parameters.add(vectorClass);
+ }
+
+ /**
+ * @param v The vector to compute the distance to
+ * @return Mahalanobis distance of a multivariate vector
+ */
+ public double distance(Vector v) {
+ return Math.sqrt(v.minus(meanVector).dot(Algebra.mult(inverseCovarianceMatrix, v.minus(meanVector))));
+ }
+
+ @Override
+ public double distance(Vector v1, Vector v2) {
+ if (v1.size() != v2.size()) {
+ throw new CardinalityException(v1.size(), v2.size());
+ }
+ return Math.sqrt(v1.minus(v2).dot(Algebra.mult(inverseCovarianceMatrix, v1.minus(v2))));
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO
+ }
+
+ public void setInverseCovarianceMatrix(Matrix inverseCovarianceMatrix) {
+ Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+ this.inverseCovarianceMatrix = inverseCovarianceMatrix;
+ }
+
+
+ /**
+ * Computes the inverse covariance from the input covariance matrix given in input.
+ *
+ * @param m A covariance matrix.
+ * @throws IllegalArgumentException if <tt>eigen values equal to 0 found</tt>.
+ */
+ public void setCovarianceMatrix(Matrix m) {
+ if (m.numRows() != m.numCols()) {
+ throw new CardinalityException(m.numRows(), m.numCols());
+ }
+ // See http://www.mlahanas.de/Math/svd.htm for details,
+ // which specifically details the case of covariance matrix inversion
+ // Complexity: O(min(nm2,mn2))
+ SingularValueDecomposition svd = new SingularValueDecomposition(m);
+ Matrix sInv = svd.getS();
+ // Inverse Diagonal Elems
+ for (int i = 0; i < sInv.numRows(); i++) {
+ double diagElem = sInv.get(i, i);
+ if (diagElem > 0.0) {
+ sInv.set(i, i, 1 / diagElem);
+ } else {
+ throw new IllegalStateException("Eigen Value equals to 0 found.");
+ }
+ }
+ inverseCovarianceMatrix = svd.getU().times(sInv.times(svd.getU().transpose()));
+ Preconditions.checkArgument(inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized");
+ }
+
+ public Matrix getInverseCovarianceMatrix() {
+ return inverseCovarianceMatrix;
+ }
+
+ public void setMeanVector(Vector meanVector) {
+ Preconditions.checkArgument(meanVector != null, "meanVector not initialized");
+ this.meanVector = meanVector;
+ }
+
+ public Vector getMeanVector() {
+ return meanVector;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
new file mode 100644
index 0000000..5c32fcf
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/ManhattanDistanceMeasure.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * This class implements a "manhattan distance" metric by summing the absolute values of the difference
+ * between each coordinate
+ */
+public class ManhattanDistanceMeasure implements DistanceMeasure {
+
+ public static double distance(double[] p1, double[] p2) {
+ double result = 0.0;
+ for (int i = 0; i < p1.length; i++) {
+ result += Math.abs(p2[i] - p1[i]);
+ }
+ return result;
+ }
+
+ @Override
+ public void configure(Configuration job) {
+ // nothing to do
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) {
+ // nothing to do
+ }
+
+ @Override
+ public double distance(Vector v1, Vector v2) {
+ if (v1.size() != v2.size()) {
+ throw new CardinalityException(v1.size(), v2.size());
+ }
+ return v1.aggregate(v2, Functions.PLUS, Functions.MINUS_ABS);
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
new file mode 100644
index 0000000..3a57f2f
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.DoubleParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * Implement Minkowski distance, a real-valued generalization of the
+ * integral L(n) distances: Manhattan = L1, Euclidean = L2.
+ * For high numbers of dimensions, very high exponents give more useful distances.
+ *
+ * Note: Math.pow is clever about integer-valued doubles.
+ **/
+public class MinkowskiDistanceMeasure implements DistanceMeasure {
+
+ private static final double EXPONENT = 3.0;
+
+ private List<Parameter<?>> parameters;
+ private double exponent = EXPONENT;
+
+ public MinkowskiDistanceMeasure() {
+ }
+
+ public MinkowskiDistanceMeasure(double exponent) {
+ this.exponent = exponent;
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration conf) {
+ parameters = Lists.newArrayList();
+ Parameter<?> param =
+ new DoubleParameter(prefix, "exponent", conf, EXPONENT, "Exponent for Fractional Lagrange distance");
+ parameters.add(param);
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return parameters;
+ }
+
+ @Override
+ public void configure(Configuration jobConf) {
+ if (parameters == null) {
+ ParameteredGeneralizations.configureParameters(this, jobConf);
+ }
+ }
+
+ public double getExponent() {
+ return exponent;
+ }
+
+ public void setExponent(double exponent) {
+ this.exponent = exponent;
+ }
+
+ /**
+ * Math.pow is clever about integer-valued doubles
+ */
+ @Override
+ public double distance(Vector v1, Vector v2) {
+ return Math.pow(v1.aggregate(v2, Functions.PLUS, Functions.minusAbsPow(exponent)), 1.0 / exponent);
+ }
+
+ // TODO: how?
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO - can this use centroidLengthSquare somehow?
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
new file mode 100644
index 0000000..66da121
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/SquaredEuclideanDistanceMeasure.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Like {@link EuclideanDistanceMeasure} but it does not take the square root.
+ * <p/>
+ * Thus, it is not actually the Euclidean Distance, but it is saves on computation when you only need the
+ * distance for comparison and don't care about the actual value as a distance.
+ */
+public class SquaredEuclideanDistanceMeasure implements DistanceMeasure {
+
+ @Override
+ public void configure(Configuration job) {
+ // nothing to do
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) {
+ // nothing to do
+ }
+
+ @Override
+ public double distance(Vector v1, Vector v2) {
+ return v2.getDistanceSquared(v1);
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return centroidLengthSquare - 2 * v.dot(centroid) + v.getLengthSquared();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
new file mode 100644
index 0000000..cfeb119
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/TanimotoDistanceMeasure.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * Tanimoto coefficient implementation.
+ *
+ * http://en.wikipedia.org/wiki/Jaccard_index
+ */
+public class TanimotoDistanceMeasure extends WeightedDistanceMeasure {
+
+ /**
+ * Calculates the distance between two vectors.
+ *
+ * The coefficient (a measure of similarity) is: T(a, b) = a.b / (|a|^2 + |b|^2 - a.b)
+ *
+ * The distance d(a,b) = 1 - T(a,b)
+ *
+ * @return 0 for perfect match, > 0 for greater distance
+ */
+ @Override
+ public double distance(Vector a, Vector b) {
+ double ab;
+ double denominator;
+ if (getWeights() != null) {
+ ab = a.times(b).aggregate(getWeights(), Functions.PLUS, Functions.MULT);
+ denominator = a.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT)
+ + b.aggregate(getWeights(), Functions.PLUS, Functions.MULT_SQUARE_LEFT)
+ - ab;
+ } else {
+ ab = b.dot(a); // b is SequentialAccess
+ denominator = a.getLengthSquared() + b.getLengthSquared() - ab;
+ }
+
+ if (denominator < ab) { // correct for fp round-off: distance >= 0
+ denominator = ab;
+ }
+ if (denominator > 0) {
+ // denominator == 0 only when dot(a,a) == dot(b,b) == dot(a,b) == 0
+ return 1.0 - ab / denominator;
+ } else {
+ return 0.0;
+ }
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
new file mode 100644
index 0000000..0c1d2cd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.parameters.ClassParameter;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.common.parameters.PathParameter;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/** Abstract implementation of DistanceMeasure with support for weights. */
+public abstract class WeightedDistanceMeasure implements DistanceMeasure {
+
+ private List<Parameter<?>> parameters;
+ private Parameter<Path> weightsFile;
+ private ClassParameter vectorClass;
+ private Vector weights;
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) {
+ parameters = Lists.newArrayList();
+ weightsFile = new PathParameter(prefix, "weightsFile", jobConf, null,
+ "Path on DFS to a file containing the weights.");
+ parameters.add(weightsFile);
+ vectorClass = new ClassParameter(prefix, "vectorClass", jobConf, DenseVector.class,
+ "Class<Vector> file specified in parameter weightsFile has been serialized with.");
+ parameters.add(vectorClass);
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return parameters;
+ }
+
+ @Override
+ public void configure(Configuration jobConf) {
+ if (parameters == null) {
+ ParameteredGeneralizations.configureParameters(this, jobConf);
+ }
+ try {
+ if (weightsFile.get() != null) {
+ FileSystem fs = FileSystem.get(weightsFile.get().toUri(), jobConf);
+ VectorWritable weights =
+ ClassUtils.instantiateAs((Class<? extends VectorWritable>) vectorClass.get(), VectorWritable.class);
+ if (!fs.exists(weightsFile.get())) {
+ throw new FileNotFoundException(weightsFile.get().toString());
+ }
+ DataInputStream in = fs.open(weightsFile.get());
+ try {
+ weights.readFields(in);
+ } finally {
+ Closeables.close(in, true);
+ }
+ this.weights = weights.get();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Vector getWeights() {
+ return weights;
+ }
+
+ public void setWeights(Vector weights) {
+ this.weights = weights;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
new file mode 100644
index 0000000..c6889e2
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * This class implements a Euclidean distance metric by summing the square root of the squared differences
+ * between each coordinate, optionally adding weights.
+ */
+public class WeightedEuclideanDistanceMeasure extends WeightedDistanceMeasure {
+
+ @Override
+ public double distance(Vector p1, Vector p2) {
+ double result = 0;
+ Vector res = p2.minus(p1);
+ Vector theWeights = getWeights();
+ if (theWeights == null) {
+ for (Element elt : res.nonZeroes()) {
+ result += elt.get() * elt.get();
+ }
+ } else {
+ for (Element elt : res.nonZeroes()) {
+ result += elt.get() * elt.get() * theWeights.get(elt.index());
+ }
+ }
+ return Math.sqrt(result);
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
new file mode 100644
index 0000000..2c280e2
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedManhattanDistanceMeasure.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * This class implements a "Manhattan distance" metric by summing the absolute values of the difference
+ * between each coordinate, optionally with weights.
+ */
+public class WeightedManhattanDistanceMeasure extends WeightedDistanceMeasure {
+
+ @Override
+ public double distance(Vector p1, Vector p2) {
+ double result = 0;
+
+ Vector res = p2.minus(p1);
+ if (getWeights() == null) {
+ for (Element elt : res.nonZeroes()) {
+ result += Math.abs(elt.get());
+ }
+
+ } else {
+ for (Element elt : res.nonZeroes()) {
+ result += Math.abs(elt.get() * getWeights().get(elt.index()));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public double distance(double centroidLengthSquare, Vector centroid, Vector v) {
+ return distance(centroid, v); // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
new file mode 100644
index 0000000..73cc821
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/CopyConstructorIterator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+
+/**
+ * An iterator that copies the values in an underlying iterator by finding an appropriate copy constructor.
+ */
+public final class CopyConstructorIterator<T> extends ForwardingIterator<T> {
+
+ private final Iterator<T> delegate;
+ private Constructor<T> constructor;
+
+ public CopyConstructorIterator(Iterator<? extends T> copyFrom) {
+ this.delegate = Iterators.transform(
+ copyFrom,
+ new Function<T,T>() {
+ @Override
+ public T apply(T from) {
+ if (constructor == null) {
+ Class<T> elementClass = (Class<T>) from.getClass();
+ try {
+ constructor = elementClass.getConstructor(elementClass);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ try {
+ return constructor.newInstance(from);
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ protected Iterator<T> delegate() {
+ return delegate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
new file mode 100644
index 0000000..658c1f1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/CountingIterator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * Iterates over the integers from 0 through {@code to-1}.
+ */
+public final class CountingIterator extends AbstractIterator<Integer> {
+
+ private int count;
+ private final int to;
+
+ public CountingIterator(int to) {
+ this.to = to;
+ }
+
+ @Override
+ protected Integer computeNext() {
+ if (count < to) {
+ return count++;
+ } else {
+ return endOfData();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
new file mode 100644
index 0000000..cfc18d6
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Iterable representing the lines of a text file. It can produce an {@link Iterator} over those lines. This
+ * assumes the text file's lines are delimited in a manner consistent with how {@link java.io.BufferedReader}
+ * defines lines.
+ *
+ */
+public final class FileLineIterable implements Iterable<String> {
+
+ private final InputStream is;
+ private final Charset encoding;
+ private final boolean skipFirstLine;
+ private final String origFilename;
+
+ /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */
+ public FileLineIterable(File file) throws IOException {
+ this(file, Charsets.UTF_8, false);
+ }
+
+ /** Creates a {@link FileLineIterable} over a given file, assuming a UTF-8 encoding. */
+ public FileLineIterable(File file, boolean skipFirstLine) throws IOException {
+ this(file, Charsets.UTF_8, skipFirstLine);
+ }
+
+ /** Creates a {@link FileLineIterable} over a given file, using the given encoding. */
+ public FileLineIterable(File file, Charset encoding, boolean skipFirstLine) throws IOException {
+ this(FileLineIterator.getFileInputStream(file), encoding, skipFirstLine);
+ }
+
+ public FileLineIterable(InputStream is) {
+ this(is, Charsets.UTF_8, false);
+ }
+
+ public FileLineIterable(InputStream is, boolean skipFirstLine) {
+ this(is, Charsets.UTF_8, skipFirstLine);
+ }
+
+ public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine) {
+ this.is = is;
+ this.encoding = encoding;
+ this.skipFirstLine = skipFirstLine;
+ this.origFilename = "";
+ }
+
+ public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine, String filename) {
+ this.is = is;
+ this.encoding = encoding;
+ this.skipFirstLine = skipFirstLine;
+ this.origFilename = filename;
+ }
+
+
+ @Override
+ public Iterator<String> iterator() {
+ try {
+ return new FileLineIterator(is, encoding, skipFirstLine, this.origFilename);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
new file mode 100644
index 0000000..b7cc51e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipInputStream;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iterates over the lines of a text file. This assumes the text file's lines are delimited in a manner
+ * consistent with how {@link BufferedReader} defines lines.
+ * <p/>
+ * This class will uncompress files that end in .zip or .gz accordingly, too.
+ */
+public final class FileLineIterator extends AbstractIterator<String> implements SkippingIterator<String>, Closeable {
+
+ private final BufferedReader reader;
+
+ private static final Logger log = LoggerFactory.getLogger(FileLineIterator.class);
+
+ /**
+ * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding.
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
+ * @throws IOException
+ * if the file cannot be read
+ */
+ public FileLineIterator(File file) throws IOException {
+ this(file, Charsets.UTF_8, false);
+ }
+
+ /**
+ * Creates a {@link FileLineIterator} over a given file, assuming a UTF-8 encoding.
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
+ * @throws IOException if the file cannot be read
+ */
+ public FileLineIterator(File file, boolean skipFirstLine) throws IOException {
+ this(file, Charsets.UTF_8, skipFirstLine);
+ }
+
+ /**
+ * Creates a {@link FileLineIterator} over a given file, using the given encoding.
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
+ * @throws IOException if the file cannot be read
+ */
+ public FileLineIterator(File file, Charset encoding, boolean skipFirstLine) throws IOException {
+ this(getFileInputStream(file), encoding, skipFirstLine);
+ }
+
+ public FileLineIterator(InputStream is) throws IOException {
+ this(is, Charsets.UTF_8, false);
+ }
+
+ public FileLineIterator(InputStream is, boolean skipFirstLine) throws IOException {
+ this(is, Charsets.UTF_8, skipFirstLine);
+ }
+
+ public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine) throws IOException {
+ reader = new BufferedReader(new InputStreamReader(is, encoding));
+ if (skipFirstLine) {
+ reader.readLine();
+ }
+ }
+
+ public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine, String filename)
+ throws IOException {
+ InputStream compressedInputStream;
+
+ if ("gz".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+ compressedInputStream = new GZIPInputStream(is);
+ } else if ("zip".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+ compressedInputStream = new ZipInputStream(is);
+ } else {
+ compressedInputStream = is;
+ }
+
+ reader = new BufferedReader(new InputStreamReader(compressedInputStream, encoding));
+ if (skipFirstLine) {
+ reader.readLine();
+ }
+ }
+
+ static InputStream getFileInputStream(File file) throws IOException {
+ InputStream is = new FileInputStream(file);
+ String name = file.getName();
+ if ("gz".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
+ return new GZIPInputStream(is);
+ } else if ("zip".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
+ return new ZipInputStream(is);
+ } else {
+ return is;
+ }
+ }
+
+ @Override
+ protected String computeNext() {
+ String line;
+ try {
+ line = reader.readLine();
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ throw new IllegalStateException(ioe);
+ }
+ return line == null ? endOfData() : line;
+ }
+
+
+ @Override
+ public void skip(int n) {
+ try {
+ for (int i = 0; i < n; i++) {
+ if (reader.readLine() == null) {
+ break;
+ }
+ }
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ endOfData();
+ Closeables.close(reader, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
new file mode 100644
index 0000000..1905654
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/FixedSizeSamplingIterator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * Sample a fixed number of elements from an Iterator. The results can appear in any order.
+ */
+public final class FixedSizeSamplingIterator<T> extends ForwardingIterator<T> {
+
+ private final Iterator<T> delegate;
+
+ public FixedSizeSamplingIterator(int size, Iterator<T> source) {
+ List<T> buf = Lists.newArrayListWithCapacity(size);
+ int sofar = 0;
+ Random random = RandomUtils.getRandom();
+ while (source.hasNext()) {
+ T v = source.next();
+ sofar++;
+ if (buf.size() < size) {
+ buf.add(v);
+ } else {
+ int position = random.nextInt(sofar);
+ if (position < buf.size()) {
+ buf.set(position, v);
+ }
+ }
+ }
+ delegate = buf.iterator();
+ }
+
+ @Override
+ protected Iterator<T> delegate() {
+ return delegate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
new file mode 100644
index 0000000..46ef411
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterable.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+
+/**
+ * Wraps an {@link Iterable} whose {@link Iterable#iterator()} returns only some subset of the elements that
+ * it would, as determined by a iterator rate parameter.
+ */
+public final class SamplingIterable<T> implements Iterable<T> {
+
+ private final Iterable<? extends T> delegate;
+ private final double samplingRate;
+
+ public SamplingIterable(Iterable<? extends T> delegate, double samplingRate) {
+ this.delegate = delegate;
+ this.samplingRate = samplingRate;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new SamplingIterator<T>(delegate.iterator(), samplingRate);
+ }
+
+ public static <T> Iterable<T> maybeWrapIterable(Iterable<T> delegate, double samplingRate) {
+ return samplingRate >= 1.0 ? delegate : new SamplingIterable<T>(delegate, samplingRate);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
new file mode 100644
index 0000000..2ba46fd
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/SamplingIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import org.apache.commons.math3.distribution.PascalDistribution;
+import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.RandomWrapper;
+
+/**
+ * Wraps an {@link Iterator} and returns only some subset of the elements that it would, as determined by a
+ * iterator rate parameter.
+ */
+public final class SamplingIterator<T> extends AbstractIterator<T> {
+
+ private final PascalDistribution geometricDistribution;
+ private final Iterator<? extends T> delegate;
+
+ public SamplingIterator(Iterator<? extends T> delegate, double samplingRate) {
+ this(RandomUtils.getRandom(), delegate, samplingRate);
+ }
+
+ public SamplingIterator(RandomWrapper random, Iterator<? extends T> delegate, double samplingRate) {
+ Preconditions.checkNotNull(delegate);
+ Preconditions.checkArgument(samplingRate > 0.0 && samplingRate <= 1.0,
+ "Must be: 0.0 < samplingRate <= 1.0. But samplingRate = " + samplingRate);
+ // Geometric distribution is special case of negative binomial (aka Pascal) with r=1:
+ geometricDistribution = new PascalDistribution(random.getRandomGenerator(), 1, samplingRate);
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected T computeNext() {
+ int toSkip = geometricDistribution.sample();
+ if (delegate instanceof SkippingIterator<?>) {
+ SkippingIterator<? extends T> skippingDelegate = (SkippingIterator<? extends T>) delegate;
+ skippingDelegate.skip(toSkip);
+ if (skippingDelegate.hasNext()) {
+ return skippingDelegate.next();
+ }
+ } else {
+ for (int i = 0; i < toSkip && delegate.hasNext(); i++) {
+ delegate.next();
+ }
+ if (delegate.hasNext()) {
+ return delegate.next();
+ }
+ }
+ return endOfData();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
new file mode 100644
index 0000000..c4ddf7b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/StableFixedSizeSamplingIterator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+
+/**
+ * Sample a fixed number of elements from an Iterator. The results will appear in the original order at some
+ * cost in time and memory relative to a FixedSizeSampler.
+ */
+public class StableFixedSizeSamplingIterator<T> extends ForwardingIterator<T> {
+
+ private final Iterator<T> delegate;
+
+ public StableFixedSizeSamplingIterator(int size, Iterator<T> source) {
+ List<Pair<Integer,T>> buf = Lists.newArrayListWithCapacity(size);
+ int sofar = 0;
+ Random random = RandomUtils.getRandom();
+ while (source.hasNext()) {
+ T v = source.next();
+ sofar++;
+ if (buf.size() < size) {
+ buf.add(new Pair<>(sofar, v));
+ } else {
+ int position = random.nextInt(sofar);
+ if (position < buf.size()) {
+ buf.set(position, new Pair<>(sofar, v));
+ }
+ }
+ }
+
+ Collections.sort(buf);
+ delegate = Iterators.transform(buf.iterator(),
+ new Function<Pair<Integer,T>,T>() {
+ @Override
+ public T apply(Pair<Integer,T> from) {
+ return from.getSecond();
+ }
+ });
+ }
+
+ @Override
+ protected Iterator<T> delegate() {
+ return delegate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
new file mode 100644
index 0000000..73b841e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/StringRecordIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import org.apache.mahout.common.Pair;
+
+public class StringRecordIterator extends ForwardingIterator<Pair<List<String>,Long>> {
+
+ private static final Long ONE = 1L;
+
+ private final Pattern splitter;
+ private final Iterator<Pair<List<String>,Long>> delegate;
+
+ public StringRecordIterator(Iterable<String> stringIterator, String pattern) {
+ this.splitter = Pattern.compile(pattern);
+ delegate = Iterators.transform(
+ stringIterator.iterator(),
+ new Function<String,Pair<List<String>,Long>>() {
+ @Override
+ public Pair<List<String>,Long> apply(String from) {
+ String[] items = splitter.split(from);
+ return new Pair<>(Arrays.asList(items), ONE);
+ }
+ });
+ }
+
+ @Override
+ protected Iterator<Pair<List<String>,Long>> delegate() {
+ return delegate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
new file mode 100644
index 0000000..19f78b5
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Supplies some useful and repeatedly-used instances of {@link PathFilter}.
+ */
+public final class PathFilters {
+
+ private static final PathFilter PART_FILE_INSTANCE = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith("part-") && !name.endsWith(".crc");
+ }
+ };
+
+ /**
+ * Pathfilter to read the final clustering file.
+ */
+ private static final PathFilter CLUSTER_FINAL = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith("clusters-") && name.endsWith("-final");
+ }
+ };
+
+ private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_"));
+ }
+ };
+
+ private PathFilters() {
+ }
+
+ /**
+ * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes
+ * ".crc" files.
+ */
+ public static PathFilter partFilter() {
+ return PART_FILE_INSTANCE;
+ }
+
+ /**
+ * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final".
+ */
+ public static PathFilter finalPartFilter() {
+ return CLUSTER_FINAL;
+ }
+
+ /**
+ * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera
+ * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc"
+ */
+ public static PathFilter logsCRCFilter() {
+ return LOGS_CRC_INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
new file mode 100644
index 0000000..7ea713e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+/**
+ * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a
+ * directory to list, or a glob pattern.
+ */
+public enum PathType {
+ GLOB,
+ LIST,
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
new file mode 100644
index 0000000..ca4d6b8
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirIterator}.</p>
+ */
+public final class SequenceFileDirIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+ private final Path path;
+ private final PathType pathType;
+ private final PathFilter filter;
+ private final Comparator<FileStatus> ordering;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) {
+ this(path, pathType, null, conf);
+ }
+
+ public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+ this(path, pathType, filter, null, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+ * glob pattern ({@link PathType#GLOB})
+ * @param filter if not null, specifies sequence files to be ignored by the iteration
+ * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileDirIterable(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) {
+ this.path = path;
+ this.pathType = pathType;
+ this.filter = filter;
+ this.ordering = ordering;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<Pair<K,V>> iterator() {
+ try {
+ return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
new file mode 100644
index 0000000..cf6a871
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+
+/**
+ * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path
+ * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
+ extends ForwardingIterator<Pair<K,V>> implements Closeable {
+
+ private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+ private Iterator<Pair<K,V>> delegate;
+ private final List<SequenceFileIterator<K,V>> iterators;
+
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ */
+ public SequenceFileDirIterator(Path[] path,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ iterators = Lists.newArrayList();
+ // we assume all files should exist, otherwise we will bail out.
+ FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = fs.getFileStatus(path[i]);
+ }
+ init(statuses, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+ * (depending on pathType parameter).
+ */
+ public SequenceFileDirIterator(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
+ iterators = Lists.newArrayList();
+ init(statuses, reuseKeyValueInstances, conf);
+ }
+
+ private void init(FileStatus[] statuses,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null) {
+ statuses = NO_STATUSES;
+ }
+
+ Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ Iterator<Iterator<Pair<K, V>>> fsIterators =
+ Iterators.transform(fileStatusIterator,
+ new Function<FileStatus, Iterator<Pair<K, V>>>() {
+ @Override
+ public Iterator<Pair<K, V>> apply(FileStatus from) {
+ try {
+ SequenceFileIterator<K, V> iterator = new SequenceFileIterator<>(from.getPath(),
+ reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(from.getPath().toString(), ioe);
+ }
+ }
+ });
+
+ Collections.reverse(iterators); // close later in reverse order
+
+ delegate = Iterators.concat(fsIterators);
+ }
+
+ @Override
+ protected Iterator<Pair<K,V>> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ iterators.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
new file mode 100644
index 0000000..1cb4ebc
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.</p>
+ */
+public final class SequenceFileDirValueIterable<V extends Writable> implements Iterable<V> {
+
+ private final Path path;
+ private final PathType pathType;
+ private final PathFilter filter;
+ private final Comparator<FileStatus> ordering;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) {
+ this(path, pathType, null, conf);
+ }
+
+ public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+ this(path, pathType, filter, null, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+ * glob pattern ({@link PathType#GLOB})
+ * @param filter if not null, specifies sequence files to be ignored by the iteration
+ * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileDirValueIterable(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) {
+ this.path = path;
+ this.pathType = pathType;
+ this.filter = filter;
+ this.ordering = ordering;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ try {
+ return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
new file mode 100644
index 0000000..908c8bb
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+
+/**
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+ ForwardingIterator<V> implements Closeable {
+
+ private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+ private Iterator<V> delegate;
+ private final List<SequenceFileValueIterator<V>> iterators;
+
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+ * (depending on pathType parameter).
+ */
+ public SequenceFileDirValueIterator(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+ FileStatus[] statuses;
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ if (filter == null) {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+ } else {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+ }
+ iterators = Lists.newArrayList();
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ */
+ public SequenceFileDirValueIterator(Path[] path,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ iterators = Lists.newArrayList();
+ /*
+ * we assume all files should exist, otherwise we will bail out.
+ */
+ FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = fs.getFileStatus(path[i]);
+ }
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ private void init(FileStatus[] statuses,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws IOException {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null) {
+ statuses = NO_STATUSES;
+ }
+
+ if (ordering != null) {
+ Arrays.sort(statuses, ordering);
+ }
+ Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ try {
+
+ Iterator<Iterator<V>> fsIterators =
+ Iterators.transform(fileStatusIterator,
+ new Function<FileStatus, Iterator<V>>() {
+ @Override
+ public Iterator<V> apply(FileStatus from) {
+ try {
+ SequenceFileValueIterator<V> iterator = new SequenceFileValueIterator<>(from.getPath(),
+ reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(from.getPath().toString(), ioe);
+ }
+ }
+ });
+
+ Collections.reverse(iterators); // close later in reverse order
+
+ delegate = Iterators.concat(fsIterators);
+
+ } finally {
+ /*
+ * prevent file handle leaks in case one of handles fails to open. If some
+ * of the files fail to open, constructor will fail and close() will never
+ * be called. Thus, those handles that did open in constructor, would leak
+ * out, unless we specifically handle it here.
+ */
+ IOUtils.close(iterators);
+ }
+ }
+
+ @Override
+ protected Iterator<V> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
new file mode 100644
index 0000000..f17c2a1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileIterator}.</p>
+ */
+public final class SequenceFileIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+ private final Path path;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ /**
+ * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused
+ * by default.
+ *
+ * @param path file to iterate over
+ */
+ public SequenceFileIterable(Path path, Configuration conf) {
+ this(path, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+ this.path = path;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<Pair<K, V>> iterator() {
+ try {
+ return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
new file mode 100644
index 0000000..bc5c549
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair}
+ * containing key and value.</p>
+ */
+public final class SequenceFileIterator<K extends Writable,V extends Writable>
+ extends AbstractIterator<Pair<K,V>> implements Closeable {
+
+ private final SequenceFile.Reader reader;
+ private final Configuration conf;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final boolean noValue;
+ private K key;
+ private V value;
+ private final boolean reuseKeyValueInstances;
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class);
+
+ /**
+ * @throws IOException if path can't be read, or its key or value class can't be instantiated
+ */
+
+ public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+ key = null;
+ value = null;
+ FileSystem fs = path.getFileSystem(conf);
+ path = path.makeQualified(fs);
+ reader = new SequenceFile.Reader(fs, path, conf);
+ this.conf = conf;
+ keyClass = (Class<K>) reader.getKeyClass();
+ valueClass = (Class<V>) reader.getValueClass();
+ noValue = NullWritable.class.equals(valueClass);
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ }
+
+ public Class<K> getKeyClass() {
+ return keyClass;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ key = null;
+ value = null;
+ Closeables.close(reader, true);
+
+ endOfData();
+ }
+
+ @Override
+ protected Pair<K,V> computeNext() {
+ if (!reuseKeyValueInstances || value == null) {
+ key = ReflectionUtils.newInstance(keyClass, conf);
+ if (!noValue) {
+ value = ReflectionUtils.newInstance(valueClass, conf);
+ }
+ }
+ try {
+ boolean available;
+ if (noValue) {
+ available = reader.next(key);
+ } else {
+ available = reader.next(key, value);
+ }
+ if (!available) {
+ close();
+ return null;
+ }
+ return new Pair<>(key, value);
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}