You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:52 UTC

[21/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy into mahout-hdfs and mahout-mr, closes apache/mahout#86

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
new file mode 100644
index 0000000..f9ca52e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class MatrixUtils {
+
+  private MatrixUtils() {
+  }
+
+  public static void write(Path outputDir, Configuration conf, VectorIterable matrix)
+    throws IOException {
+    FileSystem fs = outputDir.getFileSystem(conf);
+    fs.delete(outputDir, true);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outputDir,
+        IntWritable.class, VectorWritable.class);
+    IntWritable topic = new IntWritable();
+    VectorWritable vector = new VectorWritable();
+    for (MatrixSlice slice : matrix) {
+      topic.set(slice.index());
+      vector.set(slice.vector());
+      writer.append(topic, vector);
+    }
+    writer.close();
+  }
+
+  public static Matrix read(Configuration conf, Path... modelPaths) throws IOException {
+    int numRows = -1;
+    int numCols = -1;
+    boolean sparse = false;
+    List<Pair<Integer, Vector>> rows = Lists.newArrayList();
+    for (Path modelPath : modelPaths) {
+      for (Pair<IntWritable, VectorWritable> row
+          : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, true, conf)) {
+        rows.add(Pair.of(row.getFirst().get(), row.getSecond().get()));
+        numRows = Math.max(numRows, row.getFirst().get());
+        sparse = !row.getSecond().get().isDense();
+        if (numCols < 0) {
+          numCols = row.getSecond().get().size();
+        }
+      }
+    }
+    if (rows.isEmpty()) {
+      throw new IOException(Arrays.toString(modelPaths) + " have no vectors in it");
+    }
+    numRows++;
+    Vector[] arrayOfRows = new Vector[numRows];
+    for (Pair<Integer, Vector> pair : rows) {
+      arrayOfRows[pair.getFirst()] = pair.getSecond();
+    }
+    Matrix matrix;
+    if (sparse) {
+      matrix = new SparseRowMatrix(numRows, numCols, arrayOfRows);
+    } else {
+      matrix = new DenseMatrix(numRows, numCols);
+      for (int i = 0; i < numRows; i++) {
+        matrix.assignRow(i, arrayOfRows[i]);
+      }
+    }
+    return matrix;
+  }
+
+  public static OpenObjectIntHashMap<String> readDictionary(Configuration conf, Path... dictPath) {
+    OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<>();
+    for (Path dictionaryFile : dictPath) {
+      for (Pair<Writable, IntWritable> record
+              : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
+        dictionary.put(record.getFirst().toString(), record.getSecond().get());
+      }
+    }
+    return dictionary;
+  }
+
+  public static String[] invertDictionary(OpenObjectIntHashMap<String> termIdMap) {
+    int maxTermId = -1;
+    for (String term : termIdMap.keys()) {
+      maxTermId = Math.max(maxTermId, termIdMap.get(term));
+    }
+    maxTermId++;
+    String[] dictionary = new String[maxTermId];
+    for (String term : termIdMap.keys()) {
+      dictionary[termIdMap.get(term)] = term;
+    }
+    return dictionary;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
new file mode 100644
index 0000000..0c45c9a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Writable to handle serialization of a vector and a variable list of
+ * associated label indexes.
+ */
+public final class MultiLabelVectorWritable implements Writable {
+
+  private final VectorWritable vectorWritable = new VectorWritable();
+  private int[] labels;
+
+  public MultiLabelVectorWritable() {
+  }
+
+  public MultiLabelVectorWritable(Vector vector, int[] labels) {
+    this.vectorWritable.set(vector);
+    this.labels = labels;
+  }
+
+  public Vector getVector() {
+    return vectorWritable.get();
+  }
+
+  public void setVector(Vector vector) {
+    vectorWritable.set(vector);
+  }
+
+  public void setLabels(int[] labels) {
+    this.labels = labels;
+  }
+
+  public int[] getLabels() {
+    return labels;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    vectorWritable.readFields(in);
+    int labelSize = in.readInt();
+    labels = new int[labelSize];
+    for (int i = 0; i < labelSize; i++) {
+      labels[i] = in.readInt();
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    vectorWritable.write(out);
+    out.writeInt(labels.length);
+    for (int label : labels) {
+      out.writeInt(label);
+    }
+  }
+
+  public static MultiLabelVectorWritable read(DataInput in) throws IOException {
+    MultiLabelVectorWritable writable = new MultiLabelVectorWritable();
+    writable.readFields(in);
+    return writable;
+  }
+
+  public static void write(DataOutput out, SequentialAccessSparseVector ssv, int[] labels) throws IOException {
+    new MultiLabelVectorWritable(ssv, labels).write(out);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
new file mode 100644
index 0000000..1a6ff16
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a
+ * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on
+ * Hadoop.  The usage is as follows: <p>
+ * <p>
+ * <pre>
+ *   // the path must already contain an already created SequenceFile!
+ *   DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000);
+ *   m.setConf(new Configuration());
+ *   // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this
+ *   // matrix.  If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension
+ *   // of the matrix.
+ *   Vector v = new DenseVector(250000);
+ *   // now the following operation will be done via a M/R pass via Hadoop.
+ *   Vector w = m.timesSquared(v);
+ * </pre>
+ *
+ */
+public class DistributedRowMatrix implements VectorIterable, Configurable {
+  public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files";
+
+  private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class);
+
+  private final Path inputPath;
+  private final Path outputTmpPath;
+  private Configuration conf;
+  private Path rowPath;
+  private Path outputTmpBasePath;
+  private final int numRows;
+  private final int numCols;
+  private boolean keepTempFiles;
+
+  public DistributedRowMatrix(Path inputPath,
+                              Path outputTmpPath,
+                              int numRows,
+                              int numCols) {
+    this(inputPath, outputTmpPath, numRows, numCols, false);
+  }
+
+  public DistributedRowMatrix(Path inputPath,
+                              Path outputTmpPath,
+                              int numRows,
+                              int numCols,
+                              boolean keepTempFiles) {
+    this.inputPath = inputPath;
+    this.outputTmpPath = outputTmpPath;
+    this.numRows = numRows;
+    this.numCols = numCols;
+    this.keepTempFiles = keepTempFiles;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    try {
+      FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+      rowPath = fs.makeQualified(inputPath);
+      outputTmpBasePath = fs.makeQualified(outputTmpPath);
+      keepTempFiles = conf.getBoolean(KEEP_TEMP_FILES, false);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  public Path getRowPath() {
+    return rowPath;
+  }
+
+  public Path getOutputTempPath() {
+    return outputTmpBasePath;
+  }
+
+  public void setOutputTempPathString(String outPathString) {
+    try {
+      outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outPathString));
+    } catch (IOException ioe) {
+      log.warn("Unable to set outputBasePath to {}, leaving as {}",
+          outPathString, outputTmpBasePath);
+    }
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterateAll() {
+    try {
+      Path pathPattern = rowPath;
+      if (FileSystem.get(conf).getFileStatus(rowPath).isDir()) {
+        pathPattern = new Path(rowPath, "*");
+      }
+      return Iterators.transform(
+          new SequenceFileDirIterator<IntWritable,VectorWritable>(pathPattern,
+                                                                  PathType.GLOB,
+                                                                  PathFilters.logsCRCFilter(),
+                                                                  null,
+                                                                  true,
+                                                                  conf),
+          new Function<Pair<IntWritable,VectorWritable>,MatrixSlice>() {
+            @Override
+            public MatrixSlice apply(Pair<IntWritable, VectorWritable> from) {
+              return new MatrixSlice(from.getSecond().get(), from.getFirst().get());
+            }
+          });
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public int numSlices() {
+    return numRows();
+  }
+
+  @Override
+  public int numRows() {
+    return numRows;
+  }
+
+  @Override
+  public int numCols() {
+    return numCols;
+  }
+
+
+  /**
+   * This implements matrix this.transpose().times(other)
+   * @param other   a DistributedRowMatrix
+   * @return    a DistributedRowMatrix containing the product
+   */
+  public DistributedRowMatrix times(DistributedRowMatrix other) throws IOException {
+    return times(other, new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF)));
+  }
+
+  /**
+   * This implements matrix this.transpose().times(other)
+   * @param other   a DistributedRowMatrix
+   * @param outPath path to write result to
+   * @return    a DistributedRowMatrix containing the product
+   */
+  public DistributedRowMatrix times(DistributedRowMatrix other, Path outPath) throws IOException {
+    if (numRows != other.numRows()) {
+      throw new CardinalityException(numRows, other.numRows());
+    }
+
+    Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+    Configuration conf =
+        MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+                                                            rowPath,
+                                                            other.rowPath,
+                                                            outPath,
+                                                            other.numCols);
+    JobClient.runJob(new JobConf(conf));
+    DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols());
+    out.setConf(conf);
+    return out;
+  }
+
+  public Vector columnMeans() throws IOException {
+    return columnMeans("SequentialAccessSparseVector");
+  }
+
+  /**
+   * Returns the column-wise mean of a DistributedRowMatrix
+   *
+   * @param vectorClass
+   *          desired class for the column-wise mean vector e.g.
+   *          RandomAccessSparseVector, DenseVector
+   * @return Vector containing the column-wise mean of this
+   */
+  public Vector columnMeans(String vectorClass) throws IOException {
+    Path outputVectorTmpPath =
+        new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+    Configuration initialConf =
+        getConf() == null ? new Configuration() : getConf();
+    String vectorClassFull = "org.apache.mahout.math." + vectorClass;
+    Vector mean = MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath, vectorClassFull);
+    if (!keepTempFiles) {
+      FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+      fs.delete(outputVectorTmpPath, true);
+    }
+    return mean;
+  }
+
+  public DistributedRowMatrix transpose() throws IOException {
+    Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF));
+    Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+    Job transposeJob = TransposeJob.buildTransposeJob(initialConf, rowPath, outputPath, numRows);
+
+    try {
+      transposeJob.waitForCompletion(true);
+    } catch (Exception e) {
+      throw new IllegalStateException("transposition failed", e);
+    }
+
+    DistributedRowMatrix m = new DistributedRowMatrix(outputPath, outputTmpPath, numCols, numRows);
+    m.setConf(this.conf);
+    return m;
+  }
+
+  @Override
+  public Vector times(Vector v) {
+    try {
+      Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+      Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+
+      Job job = TimesSquaredJob.createTimesJob(initialConf, v, numRows, rowPath, outputVectorTmpPath);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (Exception e) {
+        throw new IllegalStateException("times failed", e);
+      }
+
+      Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+      if (!keepTempFiles) {
+        FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+        fs.delete(outputVectorTmpPath, true);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public Vector timesSquared(Vector v) {
+    try {
+      Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+      Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+
+      Job job = TimesSquaredJob.createTimesSquaredJob(initialConf, v, rowPath, outputVectorTmpPath);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (Exception e) {
+        throw new IllegalStateException("timesSquared failed", e);
+      }
+
+      Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+      if (!keepTempFiles) {
+        FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+        fs.delete(outputVectorTmpPath, true);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterator() {
+    return iterateAll();
+  }
+
+  public static class MatrixEntryWritable implements WritableComparable<MatrixEntryWritable> {
+    private int row;
+    private int col;
+    private double val;
+
+    public int getRow() {
+      return row;
+    }
+
+    public void setRow(int row) {
+      this.row = row;
+    }
+
+    public int getCol() {
+      return col;
+    }
+
+    public void setCol(int col) {
+      this.col = col;
+    }
+
+    public double getVal() {
+      return val;
+    }
+
+    public void setVal(double val) {
+      this.val = val;
+    }
+
+    @Override
+    public int compareTo(MatrixEntryWritable o) {
+      if (row > o.row) {
+        return 1;
+      } else if (row < o.row) {
+        return -1;
+      } else {
+        if (col > o.col) {
+          return 1;
+        } else if (col < o.col) {
+          return -1;
+        } else {
+          return 0;
+        }
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof MatrixEntryWritable)) {
+        return false;
+      }
+      MatrixEntryWritable other = (MatrixEntryWritable) o;
+      return row == other.row && col == other.col;
+    }
+
+    @Override
+    public int hashCode() {
+      return row + 31 * col;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(row);
+      out.writeInt(col);
+      out.writeDouble(val);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      row = in.readInt();
+      col = in.readInt();
+      val = in.readDouble();
+    }
+
+    @Override
+    public String toString() {
+      return "(" + row + ',' + col + "):" + val;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
new file mode 100644
index 0000000..b4f459a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.io.Closeables;
+
+/**
+ * MatrixColumnMeansJob is a job for calculating the column-wise mean of a
+ * DistributedRowMatrix. This job can be accessed using
+ * DistributedRowMatrix.columnMeans()
+ */
+public final class MatrixColumnMeansJob {
+
+  public static final String VECTOR_CLASS =
+    "DistributedRowMatrix.columnMeans.vector.class";
+
+  private MatrixColumnMeansJob() {
+  }
+
+  public static Vector run(Configuration conf,
+                           Path inputPath,
+                           Path outputVectorTmpPath) throws IOException {
+    return run(conf, inputPath, outputVectorTmpPath, null);
+  }
+
+  /**
+   * Job for calculating column-wise mean of a DistributedRowMatrix
+   *
+   * @param initialConf
+   * @param inputPath
+   *          path to DistributedRowMatrix input
+   * @param outputVectorTmpPath
+   *          path for temporary files created during job
+   * @param vectorClass
+   *          String of desired class for returned vector e.g. DenseVector,
+   *          RandomAccessSparseVector (may be null for {@link DenseVector} )
+   * @return Vector containing column-wise mean of DistributedRowMatrix
+   */
+  public static Vector run(Configuration initialConf,
+                           Path inputPath,
+                           Path outputVectorTmpPath,
+                           String vectorClass) throws IOException {
+
+    try {
+      initialConf.set(VECTOR_CLASS,
+                      vectorClass == null ? DenseVector.class.getName()
+                          : vectorClass);
+
+      Job job = new Job(initialConf, "MatrixColumnMeansJob");
+      job.setJarByClass(MatrixColumnMeansJob.class);
+
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+      
+      outputVectorTmpPath.getFileSystem(job.getConfiguration())
+                         .delete(outputVectorTmpPath, true);
+      job.setNumReduceTasks(1);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+      FileInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+
+      job.setMapperClass(MatrixColumnMeansMapper.class);
+      job.setReducerClass(MatrixColumnMeansReducer.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(VectorWritable.class);
+      job.setOutputKeyClass(IntWritable.class);
+      job.setOutputValueClass(VectorWritable.class);
+      job.submit();
+      job.waitForCompletion(true);
+
+      Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000");
+      SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<>(tmpFile, true, initialConf);
+      try {
+        if (iterator.hasNext()) {
+          return iterator.next().get();
+        } else {
+          return (Vector) Class.forName(vectorClass).getConstructor(int.class)
+                               .newInstance(0);
+        }
+      } finally {
+        Closeables.close(iterator, true);
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Throwable thr) {
+      throw new IOException(thr);
+    }
+  }
+
+  /**
+   * Mapper for calculation of column-wise mean.
+   */
+  public static class MatrixColumnMeansMapper extends
+      Mapper<Writable, VectorWritable, NullWritable, VectorWritable> {
+
+    private Vector runningSum;
+    private String vectorClass;
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    /**
+     * The mapper computes a running sum of the vectors the task has seen.
+     * Element 0 of the running sum vector contains a count of the number of
+     * vectors that have been seen. The remaining elements contain the
+     * column-wise running sum. Nothing is written at this stage
+     */
+    @Override
+    public void map(Writable r, VectorWritable v, Context context)
+      throws IOException {
+      if (runningSum == null) {
+          /*
+           * If this is the first vector the mapper has seen, instantiate a new
+           * vector using the parameter VECTOR_CLASS
+           */
+        runningSum = ClassUtils.instantiateAs(vectorClass,
+                                              Vector.class,
+                                              new Class<?>[] { int.class },
+                                              new Object[] { v.get().size() + 1 });
+        runningSum.set(0, 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get());
+      } else {
+        runningSum.set(0, runningSum.get(0) + 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS);
+      }
+    }
+
+    /**
+     * The column-wise sum is written at the cleanup stage. A single reducer is
+     * forced so null can be used for the key
+     */
+    @Override
+    public void cleanup(Context context) throws InterruptedException,
+      IOException {
+      if (runningSum != null) {
+        context.write(NullWritable.get(), new VectorWritable(runningSum));
+      }
+    }
+
+  }
+
+  /**
+   * The reducer adds the partial column-wise sums from each of the mappers to
+   * compute the total column-wise sum. The total sum is then divided by the
+   * total count of vectors to determine the column-wise mean.
+   */
+  public static class MatrixColumnMeansReducer extends
+      Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private static final IntWritable ONE = new IntWritable(1);
+
+    private String vectorClass;
+    private Vector outputVector;
+    private final VectorWritable outputVectorWritable = new VectorWritable();
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    @Override
+    public void reduce(NullWritable n,
+                       Iterable<VectorWritable> vectors,
+                       Context context) throws IOException, InterruptedException {
+
+      /**
+       * Add together partial column-wise sums from mappers
+       */
+      for (VectorWritable v : vectors) {
+        if (outputVector == null) {
+          outputVector = v.get();
+        } else {
+          outputVector.assign(v.get(), Functions.PLUS);
+        }
+      }
+
+      /**
+       * Divide total column-wise sum by count of vectors, which corresponds to
+       * the number of rows in the DistributedRowMatrix
+       */
+      if (outputVector != null) {
+        outputVectorWritable.set(outputVector.viewPart(1,
+                                                       outputVector.size() - 1)
+                                             .divide(outputVector.get(0)));
+        context.write(ONE, outputVectorWritable);
+      } else {
+        Vector emptyVector = ClassUtils.instantiateAs(vectorClass,
+                                                      Vector.class,
+                                                      new Class<?>[] { int.class },
+                                                      new Object[] { 0 });
+        context.write(ONE, new VectorWritable(emptyVector));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
new file mode 100644
index 0000000..48eda08
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.join.CompositeInputFormat;
+import org.apache.hadoop.mapred.join.TupleWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This still uses the old MR api and as with all things in Mahout that are MapReduce is now part of 'mahout-mr'.
+ * There is no plan to convert the old MR api used here to the new MR api.
+ * This will be replaced by the new Spark based Linear Algebra bindings.
+ */
+
+public class MatrixMultiplicationJob extends AbstractJob {
+
+  private static final String OUT_CARD = "output.vector.cardinality";
+
+  public static Configuration createMatrixMultiplyJobConf(Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, outPath, outCardinality);
+  }
+  
+  public static Configuration createMatrixMultiplyJobConf(Configuration initialConf, 
+                                                          Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class);
+    conf.setInputFormat(CompositeInputFormat.class);
+    conf.set("mapred.join.expr", CompositeInputFormat.compose(
+          "inner", SequenceFileInputFormat.class, aPath, bPath));
+    conf.setInt(OUT_CARD, outCardinality);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(conf, outPath);
+    conf.setMapperClass(MatrixMultiplyMapper.class);
+    conf.setCombinerClass(MatrixMultiplicationReducer.class);
+    conf.setReducerClass(MatrixMultiplicationReducer.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(VectorWritable.class);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    return conf;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new MatrixMultiplicationJob(), args);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    addOption("numRowsA", "nra", "Number of rows of the first input matrix", true);
+    addOption("numColsA", "nca", "Number of columns of the first input matrix", true);
+    addOption("numRowsB", "nrb", "Number of rows of the second input matrix", true);
+
+    addOption("numColsB", "ncb", "Number of columns of the second input matrix", true);
+    addOption("inputPathA", "ia", "Path to the first input matrix", true);
+    addOption("inputPathB", "ib", "Path to the second input matrix", true);
+
+    addOption("outputPath", "op", "Path to the output matrix", false);
+
+    Map<String, List<String>> argMap = parseArguments(strings);
+    if (argMap == null) {
+      return -1;
+    }
+
+    DistributedRowMatrix a = new DistributedRowMatrix(new Path(getOption("inputPathA")),
+                                                      new Path(getOption("tempDir")),
+                                                      Integer.parseInt(getOption("numRowsA")),
+                                                      Integer.parseInt(getOption("numColsA")));
+    DistributedRowMatrix b = new DistributedRowMatrix(new Path(getOption("inputPathB")),
+                                                      new Path(getOption("tempDir")),
+                                                      Integer.parseInt(getOption("numRowsB")),
+                                                      Integer.parseInt(getOption("numColsB")));
+
+    a.setConf(new Configuration(getConf()));
+    b.setConf(new Configuration(getConf()));
+
+    if (hasOption("outputPath")) {
+      a.times(b, new Path(getOption("outputPath")));
+    } else {
+      a.times(b);
+    }
+
+    return 0;
+  }
+
+  public static class MatrixMultiplyMapper extends MapReduceBase
+      implements Mapper<IntWritable,TupleWritable,IntWritable,VectorWritable> {
+
+    private int outCardinality;
+    private final IntWritable row = new IntWritable();
+
+    @Override
+    public void configure(JobConf conf) {
+      outCardinality = conf.getInt(OUT_CARD, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void map(IntWritable index,
+                    TupleWritable v,
+                    OutputCollector<IntWritable,VectorWritable> out,
+                    Reporter reporter) throws IOException {
+      boolean firstIsOutFrag =  ((VectorWritable)v.get(0)).get().size() == outCardinality;
+      Vector outFrag = firstIsOutFrag ? ((VectorWritable)v.get(0)).get() : ((VectorWritable)v.get(1)).get();
+      Vector multiplier = firstIsOutFrag ? ((VectorWritable)v.get(1)).get() : ((VectorWritable)v.get(0)).get();
+
+      VectorWritable outVector = new VectorWritable();
+      for (Vector.Element e : multiplier.nonZeroes()) {
+        row.set(e.index());
+        outVector.set(outFrag.times(e.get()));
+        out.collect(row, outVector);
+      }
+    }
+  }
+
+  public static class MatrixMultiplicationReducer extends MapReduceBase
+      implements Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    @Override
+    public void reduce(IntWritable rowNum,
+                       Iterator<VectorWritable> it,
+                       OutputCollector<IntWritable,VectorWritable> out,
+                       Reporter reporter) throws IOException {
+      if (!it.hasNext()) {
+        return;
+      }
+      Vector accumulator = new RandomAccessSparseVector(it.next().get());
+      while (it.hasNext()) {
+        Vector row = it.next().get();
+        accumulator.assign(row, Functions.PLUS);
+      }
+      out.collect(rowNum, new VectorWritable(new SequentialAccessSparseVector(accumulator)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
new file mode 100644
index 0000000..e234eb9
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.net.URI;
+
+public final class TimesSquaredJob {
+
+  public static final String INPUT_VECTOR = "DistributedMatrix.times.inputVector";
+  public static final String IS_SPARSE_OUTPUT = "DistributedMatrix.times.outputVector.sparse";
+  public static final String OUTPUT_VECTOR_DIMENSION = "DistributedMatrix.times.output.dimension";
+
+  public static final String OUTPUT_VECTOR_FILENAME = "DistributedMatrix.times.outputVector";
+
+  private TimesSquaredJob() { }
+
+  public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPath)
+    throws IOException {
+    return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPath);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath,
+                                          Path outputVectorPath) throws IOException {
+
+    return createTimesSquaredJob(initialConf, v, matrixInputPath, outputVectorPath, TimesSquaredMapper.class,
+                                 VectorSummingReducer.class);
+  }
+
+  public static Job createTimesJob(Vector v, int outDim, Path matrixInputPath, Path outputVectorPath)
+    throws IOException {
+
+    return createTimesJob(new Configuration(), v, outDim, matrixInputPath, outputVectorPath);
+  }
+    
+  public static Job createTimesJob(Configuration initialConf, Vector v, int outDim, Path matrixInputPath,
+                                   Path outputVectorPath) throws IOException {
+
+    return createTimesSquaredJob(initialConf, v, outDim, matrixInputPath, outputVectorPath, TimesMapper.class,
+                                 VectorSummingReducer.class);
+  }
+
+  public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPathBase,
+      Class<? extends TimesSquaredMapper> mapClass, Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPathBase, mapClass, redClass);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath,
+      Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(initialConf, v, v.size(), matrixInputPath, outputVectorPathBase, mapClass, redClass);
+  }
+
+  public static Job createTimesSquaredJob(Vector v, int outputVectorDim, Path matrixInputPath,
+      Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(new Configuration(), v, outputVectorDim, matrixInputPath, outputVectorPathBase,
+        mapClass, redClass);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, int outputVectorDim,
+      Path matrixInputPath, Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    FileSystem fs = FileSystem.get(matrixInputPath.toUri(), initialConf);
+    matrixInputPath = fs.makeQualified(matrixInputPath);
+    outputVectorPathBase = fs.makeQualified(outputVectorPathBase);
+
+    long now = System.nanoTime();
+    Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + '/' + now);
+
+
+    SequenceFile.Writer inputVectorPathWriter = null;
+
+    try {
+      inputVectorPathWriter = new SequenceFile.Writer(fs, initialConf, inputVectorPath, NullWritable.class,
+                                                      VectorWritable.class);
+      inputVectorPathWriter.append(NullWritable.get(), new VectorWritable(v));
+    } finally {
+      Closeables.close(inputVectorPathWriter, false);
+    }
+
+    URI ivpURI = inputVectorPath.toUri();
+    DistributedCache.setCacheFiles(new URI[] { ivpURI }, initialConf);
+
+    Job job = HadoopUtil.prepareJob(matrixInputPath, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME),
+        SequenceFileInputFormat.class, mapClass, NullWritable.class, VectorWritable.class, redClass,
+        NullWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, initialConf);
+    job.setCombinerClass(redClass);
+    job.setJobName("TimesSquaredJob: " + matrixInputPath);
+
+    Configuration conf = job.getConfiguration();
+    conf.set(INPUT_VECTOR, ivpURI.toString());
+    conf.setBoolean(IS_SPARSE_OUTPUT, !v.isDense());
+    conf.setInt(OUTPUT_VECTOR_DIMENSION, outputVectorDim);
+
+    return job;
+  }
+
+  public static Vector retrieveTimesSquaredOutputVector(Path outputVectorTmpPath, Configuration conf)
+    throws IOException {
+    Path outputFile = new Path(outputVectorTmpPath, OUTPUT_VECTOR_FILENAME + "/part-r-00000");
+    SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<>(outputFile, true, conf);
+    try {
+      return iterator.next().get();
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+
+  public static class TimesSquaredMapper<T extends WritableComparable>
+      extends Mapper<T,VectorWritable, NullWritable,VectorWritable> {
+
+    private Vector outputVector;
+    private Vector inputVector;
+
+    Vector getOutputVector() {
+      return outputVector;
+    }
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      try {
+        Configuration conf = ctx.getConfiguration();
+        Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+        Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
+            "missing paths from the DistributedCache");
+
+        Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf);
+
+        SequenceFileValueIterator<VectorWritable> iterator =
+            new SequenceFileValueIterator<>(inputVectorPath, true, conf);
+        try {
+          inputVector = iterator.next().get();
+        } finally {
+          Closeables.close(iterator, true);
+        }
+
+        int outDim = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+        outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+            ? new RandomAccessSparseVector(outDim, 10)
+            : new DenseVector(outDim);
+      } catch (IOException ioe) {
+        throw new IllegalStateException(ioe);
+      }
+    }
+
+    @Override
+    protected void map(T key, VectorWritable v, Context context) throws IOException, InterruptedException {
+
+      double d = scale(v);
+      if (d == 1.0) {
+        outputVector.assign(v.get(), Functions.PLUS);
+      } else if (d != 0.0) {
+        outputVector.assign(v.get(), Functions.plusMult(d));
+      }
+    }
+
+    protected double scale(VectorWritable v) {
+      return v.get().dot(inputVector);
+    }
+
+    @Override
+    protected void cleanup(Context ctx) throws IOException, InterruptedException {
+      ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+    }
+
+  }
+
+  public static class TimesMapper extends TimesSquaredMapper<IntWritable> {
+
+
+    @Override
+    protected void map(IntWritable rowNum, VectorWritable v, Context context) throws IOException, InterruptedException {
+      double d = scale(v);
+      if (d != 0.0) {
+        getOutputVector().setQuick(rowNum.get(), d);
+      }
+    }
+  }
+
+  public static class VectorSummingReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+
+    private Vector outputVector;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      Configuration conf = ctx.getConfiguration();
+      int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+      outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+                   ? new RandomAccessSparseVector(outputDimension, 10)
+                   : new DenseVector(outputDimension);
+    }
+
+    @Override
+    protected void reduce(NullWritable key, Iterable<VectorWritable> vectors, Context ctx)
+      throws IOException, InterruptedException {
+
+      for (VectorWritable v : vectors) {
+        if (v != null) {
+          outputVector.assign(v.get(), Functions.PLUS);
+        }
+      }
+      ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
new file mode 100644
index 0000000..60066c6
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
+import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
+import org.apache.mahout.common.mapreduce.TransposeMapper;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/** Transpose a matrix */
+public class TransposeJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new TransposeJob(), args);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    addInputOption();
+    addOption("numRows", "nr", "Number of rows of the input matrix");
+    addOption("numCols", "nc", "Number of columns of the input matrix");
+    Map<String, List<String>> parsedArgs = parseArguments(strings);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int numRows = Integer.parseInt(getOption("numRows"));
+    int numCols = Integer.parseInt(getOption("numCols"));
+
+    DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), getTempPath(), numRows, numCols);
+    matrix.setConf(new Configuration(getConf()));
+    matrix.transpose();
+
+    return 0;
+  }
+
+  public static Job buildTransposeJob(Path matrixInputPath, Path matrixOutputPath, int numInputRows)
+    throws IOException {
+    return buildTransposeJob(new Configuration(), matrixInputPath, matrixOutputPath, numInputRows);
+  }
+
+  public static Job buildTransposeJob(Configuration initialConf, Path matrixInputPath, Path matrixOutputPath,
+      int numInputRows) throws IOException {
+
+    Job job = HadoopUtil.prepareJob(matrixInputPath, matrixOutputPath, SequenceFileInputFormat.class,
+        TransposeMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class,
+        VectorWritable.class, SequenceFileOutputFormat.class, initialConf);
+    job.setCombinerClass(MergeVectorsCombiner.class);
+    job.getConfiguration().setInt(TransposeMapper.NEW_NUM_COLS_PARAM, numInputRows);
+
+    job.setJobName("TransposeJob: " + matrixInputPath);
+
+    return job;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
new file mode 100644
index 0000000..89dddcc
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.lanczos.LanczosSolver;
+import org.apache.mahout.math.decomposer.lanczos.LanczosState;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See the SSVD code for a better option than using this:
+ *
+ * http://mahout.apache.org/users/dim-reduction/ssvd.html
+ * @see org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver
+ */
+public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+
+  public static final String RAW_EIGENVECTORS = "rawEigenvectors";
+
+  private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class);
+
+  private Configuration conf;
+
+  private Map<String, List<String>> parsedArgs;
+
+  /**
+   * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be
+   * uniform over all input dimensions, L_2 normalized.
+   */
+  public static Vector getInitialVector(VectorIterable corpus) {
+    Vector initialVector = new DenseVector(corpus.numCols());
+    initialVector.assign(1.0 / Math.sqrt(corpus.numCols()));
+    return initialVector;
+  }
+
+  public LanczosState runJob(Configuration originalConfig,
+                             LanczosState state,
+                             int desiredRank,
+                             boolean isSymmetric,
+                             String outputEigenVectorPathString) throws IOException {
+    ((Configurable) state.getCorpus()).setConf(new Configuration(originalConfig));
+    setConf(originalConfig);
+    solve(state, desiredRank, isSymmetric);
+    serializeOutput(state, new Path(outputEigenVectorPathString));
+    return state;
+  }
+
+  /**
+   * Factored-out LanczosSolver for the purpose of invoking it programmatically
+   */
+  public LanczosState runJob(Configuration originalConfig,
+                             Path inputPath,
+                             Path outputTmpPath,
+                             int numRows,
+                             int numCols,
+                             boolean isSymmetric,
+                             int desiredRank,
+                             String outputEigenVectorPathString) throws IOException {
+    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
+    matrix.setConf(new Configuration(originalConfig));
+    LanczosState state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
+    return runJob(originalConfig, state, desiredRank, isSymmetric, outputEigenVectorPathString);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    Path inputPath = new Path(AbstractJob.getOption(parsedArgs, "--input"));
+    Path outputPath = new Path(AbstractJob.getOption(parsedArgs, "--output"));
+    Path outputTmpPath = new Path(AbstractJob.getOption(parsedArgs, "--tempDir"));
+    Path workingDirPath = AbstractJob.getOption(parsedArgs, "--workingDir") != null
+                        ? new Path(AbstractJob.getOption(parsedArgs, "--workingDir")) : null;
+    int numRows = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numRows"));
+    int numCols = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numCols"));
+    boolean isSymmetric = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--symmetric"));
+    int desiredRank = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--rank"));
+
+    boolean cleansvd = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--cleansvd"));
+    if (cleansvd) {
+      double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--maxError"));
+      double minEigenvalue = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue"));
+      boolean inMemory = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory"));
+      return run(inputPath,
+                 outputPath,
+                 outputTmpPath,
+                 workingDirPath,
+                 numRows,
+                 numCols,
+                 isSymmetric,
+                 desiredRank,
+                 maxError,
+                 minEigenvalue,
+                 inMemory);
+    }
+    return run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, isSymmetric, desiredRank);
+  }
+
+  /**
+   * Run the solver to produce raw eigenvectors, then run the EigenVerificationJob to clean them
+   * 
+   * @param inputPath the Path to the input corpus
+   * @param outputPath the Path to the output
+   * @param outputTmpPath a Path to a temporary working directory
+   * @param numRows the int number of rows 
+   * @param numCols the int number of columns
+   * @param isSymmetric true if the input matrix is symmetric
+   * @param desiredRank the int desired rank of eigenvectors to produce
+   * @param maxError the maximum allowable error
+   * @param minEigenvalue the minimum usable eigenvalue
+   * @param inMemory true if the verification can be done in memory
+   * @return an int indicating success (0) or otherwise
+   */
+  public int run(Path inputPath,
+                 Path outputPath,
+                 Path outputTmpPath,
+                 Path workingDirPath,
+                 int numRows,
+                 int numCols,
+                 boolean isSymmetric,
+                 int desiredRank,
+                 double maxError,
+                 double minEigenvalue,
+                 boolean inMemory) throws Exception {
+    int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols,
+        isSymmetric, desiredRank);
+    if (result != 0) {
+      return result;
+    }
+    Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+    return new EigenVerificationJob().run(inputPath,
+                                          rawEigenVectorPath,
+                                          outputPath,
+                                          outputTmpPath,
+                                          maxError,
+                                          minEigenvalue,
+                                          inMemory,
+                                          getConf() != null ? new Configuration(getConf()) : new Configuration());
+  }
+
+  /**
+   * Run the solver to produce the raw eigenvectors
+   * 
+   * @param inputPath the Path to the input corpus
+   * @param outputPath the Path to the output
+   * @param outputTmpPath a Path to a temporary working directory
+   * @param numRows the int number of rows 
+   * @param numCols the int number of columns
+   * @param isSymmetric true if the input matrix is symmetric
+   * @param desiredRank the int desired rank of eigenvectors to produce
+   * @return  an int indicating success (0) or otherwise
+   */
+  public int run(Path inputPath,
+                 Path outputPath,
+                 Path outputTmpPath,
+                 Path workingDirPath,
+                 int numRows,
+                 int numCols,
+                 boolean isSymmetric,
+                 int desiredRank) throws Exception {
+    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
+    matrix.setConf(new Configuration(getConf() != null ? getConf() : new Configuration()));
+
+    LanczosState state;
+    if (workingDirPath == null) {
+      state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
+    } else {
+      HdfsBackedLanczosState hState =
+          new HdfsBackedLanczosState(matrix, desiredRank, getInitialVector(matrix), workingDirPath);
+      hState.setConf(matrix.getConf());
+      state = hState;
+    }
+    solve(state, desiredRank, isSymmetric);
+
+    Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+    serializeOutput(state, outputEigenVectorPath);
+    return 0;
+  }
+
+  /**
+   * @param state The final LanczosState to be serialized
+   * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to.
+   */
+  public void serializeOutput(LanczosState state, Path outputPath) throws IOException {
+    int numEigenVectors = state.getIterationNumber();
+    log.info("Persisting {} eigenVectors and eigenValues to: {}", numEigenVectors, outputPath); 
+    Configuration conf = getConf() != null ? getConf() : new Configuration();
+    FileSystem fs = FileSystem.get(outputPath.toUri(), conf);
+    SequenceFile.Writer seqWriter =
+        new SequenceFile.Writer(fs, conf, outputPath, IntWritable.class, VectorWritable.class);
+    try {
+      IntWritable iw = new IntWritable();
+      for (int i = 0; i < numEigenVectors; i++) {
+        // Persist eigenvectors sorted by eigenvalues in descending order\
+        NamedVector v = new NamedVector(state.getRightSingularVector(numEigenVectors - 1 - i),
+            "eigenVector" + i + ", eigenvalue = " + state.getSingularValue(numEigenVectors - 1 - i));
+        Writable vw = new VectorWritable(v);
+        iw.set(i);
+        seqWriter.append(iw, vw);
+      }
+    } finally {
+      Closeables.close(seqWriter, false);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public DistributedLanczosSolverJob job() {
+    return new DistributedLanczosSolverJob();
+  }
+
+  /**
+   * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still
+   * sublcass LanczosSolver.
+   */
+  public class DistributedLanczosSolverJob extends AbstractJob {
+    @Override
+    public void setConf(Configuration conf) {
+      DistributedLanczosSolver.this.setConf(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return DistributedLanczosSolver.this.getConf();
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      addInputOption();
+      addOutputOption();
+      addOption("numRows", "nr", "Number of rows of the input matrix");
+      addOption("numCols", "nc", "Number of columns of the input matrix");
+      addOption("rank", "r", "Desired decomposition rank (note: only roughly 1/4 to 1/3 "
+          + "of these will have the top portion of the spectrum)");
+      addOption("symmetric", "sym", "Is the input matrix square and symmetric?");
+      addOption("workingDir", "wd", "Working directory path to store Lanczos basis vectors "
+                                    + "(to be used on restarts, and to avoid too much RAM usage)");
+      // options required to run cleansvd job
+      addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the eigenvectors after SVD", false);
+      addOption("maxError", "err", "Maximum acceptable error", "0.05");
+      addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+      addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+
+      DistributedLanczosSolver.this.parsedArgs = parseArguments(args);
+      if (DistributedLanczosSolver.this.parsedArgs == null) {
+        return -1;
+      } else {
+        return DistributedLanczosSolver.this.run(args);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new DistributedLanczosSolver().job(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
new file mode 100644
index 0000000..d2f0c8c
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+
+import java.util.regex.Pattern;
+
+/**
+ * TODO this is a horrible hack.  Make a proper writable subclass also.
+ */
+public class EigenVector extends NamedVector {
+
+  private static final Pattern EQUAL_PATTERN = Pattern.compile(" = ");
+  private static final Pattern PIPE_PATTERN = Pattern.compile("\\|");
+
+  public EigenVector(Vector v, double eigenValue, double cosAngleError, int order) {
+    super(v instanceof DenseVector ? (DenseVector) v : new DenseVector(v),
+        "e|" + order + "| = |" + eigenValue + "|, err = " + cosAngleError);
+  }
+
+  public double getEigenValue() {
+    return getEigenValue(getName());
+  }
+
+  public double getCosAngleError() {
+    return getCosAngleError(getName());
+  }
+
+  public int getIndex() {
+    return getIndex(getName());
+  }
+
+  public static double getEigenValue(CharSequence name) {
+    return parseMetaData(name)[1];
+  }
+
+  public static double getCosAngleError(CharSequence name) {
+    return parseMetaData(name)[2];
+  }
+
+  public static int getIndex(CharSequence name) {
+    return (int)parseMetaData(name)[0];
+  }
+
+  public static double[] parseMetaData(CharSequence name) {
+    double[] m = new double[3];
+    String[] s = EQUAL_PATTERN.split(name);
+    m[0] = Double.parseDouble(PIPE_PATTERN.split(s[0])[1]);
+    m[1] = Double.parseDouble(PIPE_PATTERN.split(s[1])[1]);
+    m[2] = Double.parseDouble(s[2].substring(1));
+    return m;
+  }
+
+  protected double[] parseMetaData() {
+    return parseMetaData(getName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
new file mode 100644
index 0000000..a7eaaed
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.SparseRowMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SimpleEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in
+ * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t.
+ * eigenvector-ness is the cosine of the angle between e and e':
+ * </p>
+ *
+ * <pre>
+ *   error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
+ * </pre>
+ * <p>
+ * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between
+ * eigenvectors, and checks that this is close to the identity matrix.
+ * </p>
+ * <p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies
+ * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum
+ * error (as defined above) to be tolerated in an eigenvector.
+ * </p>
+ * <p>
+ * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
+ * </p>
+ */
+public class EigenVerificationJob extends AbstractJob {
+
+  public static final String CLEAN_EIGENVECTORS = "cleanEigenvectors";
+
+  private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class);
+
+  private SingularVectorVerifier eigenVerifier;
+
+  private VectorIterable eigensToVerify;
+
+  private VectorIterable corpus;
+
+  private double maxError;
+
+  private double minEigenValue;
+
+  // private boolean loadEigensInMemory;
+
+  private Path tmpOut;
+
+  private Path outPath;
+
+  private int maxEigensToKeep;
+
+  private Path cleanedEigensPath;
+
+  public void setEigensToVerify(VectorIterable eigens) {
+    eigensToVerify = eigens;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Map<String,List<String>> argMap = handleArgs(args);
+    if (argMap == null) {
+      return -1;
+    }
+    if (argMap.isEmpty()) {
+      return 0;
+    }
+    // parse out the arguments
+    runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(),
+        getOption("inMemory") != null, Double.parseDouble(getOption("maxError")),
+        // Double.parseDouble(getOption("minEigenvalue")),
+        Integer.parseInt(getOption("maxEigens")));
+    return 0;
+  }
+
+  /**
+   * Run the job with the given arguments
+   *
+   * @param corpusInput
+   *          the corpus input Path
+   * @param eigenInput
+   *          the eigenvector input Path
+   * @param output
+   *          the output Path
+   * @param tempOut
+   *          temporary output Path
+   * @param maxError
+   *          a double representing the maximum error
+   * @param minEigenValue
+   *          a double representing the minimum eigenvalue
+   * @param inMemory
+   *          a boolean requesting in-memory preparation
+   * @param conf
+   *          the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes
+   *          unless needed)
+   */
+  public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue,
+      boolean inMemory, Configuration conf) throws IOException {
+    this.outPath = output;
+    this.tmpOut = tempOut;
+    this.maxError = maxError;
+    this.minEigenValue = minEigenValue;
+
+    if (eigenInput != null && eigensToVerify == null) {
+      prepareEigens(conf, eigenInput, inMemory);
+    }
+    DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tempOut, 1, 1);
+    c.setConf(conf);
+    corpus = c;
+
+    // set up eigenverifier and orthoverifier TODO: allow multithreaded execution
+
+    eigenVerifier = new SimpleEigenVerifier();
+
+    // we don't currently verify orthonormality here.
+    // VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
+
+    Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+
+    saveCleanEigens(new Configuration(), prunedEigenMeta);
+    return 0;
+  }
+
+  private Map<String,List<String>> handleArgs(String[] args) throws IOException {
+    addOutputOption();
+    addOption("eigenInput", "ei",
+        "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
+    addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
+    addOption(DefaultOptionCreator.outputOption().create());
+    addOption(DefaultOptionCreator.helpOption());
+    addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+    addOption("maxError", "err", "Maximum acceptable error", "0.05");
+    addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+    addOption("maxEigens", "max", "Maximum number of eigenvectors to keep (0 means all)", "0");
+
+    return parseArguments(args);
+  }
+
+  private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta)
+      throws IOException {
+    Path path = new Path(outPath, CLEAN_EIGENVECTORS);
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+    try {
+      IntWritable iw = new IntWritable();
+      int numEigensWritten = 0;
+      int index = 0;
+      for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
+        MatrixSlice s = pruneSlice.getKey();
+        EigenStatus meta = pruneSlice.getValue();
+        EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index());
+        // log.info("appending {} to {}", ev, path);
+        Writable vw = new VectorWritable(ev);
+        iw.set(index++);
+        seqWriter.append(iw, vw);
+
+        // increment the number of eigenvectors written and see if we've
+        // reached our specified limit, or if we wish to write all eigenvectors
+        // (latter is built-in, since numEigensWritten will always be > 0
+        numEigensWritten++;
+        if (numEigensWritten == maxEigensToKeep) {
+          log.info("{} of the {} total eigens have been written", maxEigensToKeep, prunedEigenMeta.size());
+          break;
+        }
+      }
+    } finally {
+      Closeables.close(seqWriter, false);
+    }
+    cleanedEigensPath = path;
+  }
+
+  private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList();
+
+    for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
+      if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
+        prunedEigenMeta.add(entry);
+      }
+    }
+
+    Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
+      @Override
+      public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) {
+        // sort eigens on eigenvalues in descending order
+        Double eg1 = e1.getValue().getEigenValue();
+        Double eg2 = e2.getValue().getEigenValue();
+        return eg1.compareTo(eg2);
+      }
+    });
+
+    // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones
+    List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList();
+    Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0);
+    selectedEigenMeta.add(e1);
+    int selectedEigenMetaLength = selectedEigenMeta.size();
+    int prunedEigenMetaLength = prunedEigenMeta.size();
+
+    while (prunedEigenMetaLength > 0) {
+      double sum = Double.MAX_VALUE;
+      int index = 0;
+      for (int i = 0; i < prunedEigenMetaLength; i++) {
+        Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i);
+        double tmp = 0;
+        for (int j = 0; j < selectedEigenMetaLength; j++) {
+          Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j);
+          tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2);
+        }
+        if (tmp < sum) {
+          sum = tmp;
+          index = i;
+        }
+      }
+      Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index);
+      selectedEigenMeta.add(e);
+      selectedEigenMetaLength++;
+      prunedEigenMetaLength--;
+    }
+
+    return selectedEigenMeta;
+  }
+
+  private Map<MatrixSlice,EigenStatus> verifyEigens() {
+    Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap();
+
+    for (MatrixSlice slice : eigensToVerify) {
+      EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
+      eigenMetaData.put(slice, status);
+    }
+    return eigenMetaData;
+  }
+
+  private void prepareEigens(Configuration conf, Path eigenInput, boolean inMemory) {
+    DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1);
+    eigens.setConf(conf);
+    if (inMemory) {
+      List<Vector> eigenVectors = Lists.newArrayList();
+      for (MatrixSlice slice : eigens) {
+        eigenVectors.add(slice.vector());
+      }
+      eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(),
+          eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true);
+
+    } else {
+      eigensToVerify = eigens;
+    }
+  }
+
+  public Path getCleanedEigensPath() {
+    return cleanedEigensPath;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new EigenVerificationJob(), args);
+  }
+
+  /**
+   * Progammatic invocation of run()
+   *
+   * @param eigenInput
+   *          Output of LanczosSolver
+   * @param corpusInput
+   *          Input of LanczosSolver
+   */
+  public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory,
+      double maxError, int maxEigens) throws IOException {
+    // no need to handle command line arguments
+    outPath = output;
+    tmpOut = new Path(outPath, "tmp");
+    maxEigensToKeep = maxEigens;
+    this.maxError = maxError;
+    if (eigenInput != null && eigensToVerify == null) {
+      prepareEigens(new Configuration(conf), eigenInput, inMemory);
+    }
+
+    DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tmpOut, 1, 1);
+    c.setConf(new Configuration(conf));
+    corpus = c;
+
+    eigenVerifier = new SimpleEigenVerifier();
+
+    Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+    saveCleanEigens(conf, prunedEigenMeta);
+  }
+}