You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:52 UTC
[21/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
new file mode 100644
index 0000000..f9ca52e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class MatrixUtils {
+
+ private MatrixUtils() {
+ }
+
+ public static void write(Path outputDir, Configuration conf, VectorIterable matrix)
+ throws IOException {
+ FileSystem fs = outputDir.getFileSystem(conf);
+ fs.delete(outputDir, true);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outputDir,
+ IntWritable.class, VectorWritable.class);
+ IntWritable topic = new IntWritable();
+ VectorWritable vector = new VectorWritable();
+ for (MatrixSlice slice : matrix) {
+ topic.set(slice.index());
+ vector.set(slice.vector());
+ writer.append(topic, vector);
+ }
+ writer.close();
+ }
+
+ public static Matrix read(Configuration conf, Path... modelPaths) throws IOException {
+ int numRows = -1;
+ int numCols = -1;
+ boolean sparse = false;
+ List<Pair<Integer, Vector>> rows = Lists.newArrayList();
+ for (Path modelPath : modelPaths) {
+ for (Pair<IntWritable, VectorWritable> row
+ : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, true, conf)) {
+ rows.add(Pair.of(row.getFirst().get(), row.getSecond().get()));
+ numRows = Math.max(numRows, row.getFirst().get());
+ sparse = !row.getSecond().get().isDense();
+ if (numCols < 0) {
+ numCols = row.getSecond().get().size();
+ }
+ }
+ }
+ if (rows.isEmpty()) {
+ throw new IOException(Arrays.toString(modelPaths) + " have no vectors in it");
+ }
+ numRows++;
+ Vector[] arrayOfRows = new Vector[numRows];
+ for (Pair<Integer, Vector> pair : rows) {
+ arrayOfRows[pair.getFirst()] = pair.getSecond();
+ }
+ Matrix matrix;
+ if (sparse) {
+ matrix = new SparseRowMatrix(numRows, numCols, arrayOfRows);
+ } else {
+ matrix = new DenseMatrix(numRows, numCols);
+ for (int i = 0; i < numRows; i++) {
+ matrix.assignRow(i, arrayOfRows[i]);
+ }
+ }
+ return matrix;
+ }
+
+ public static OpenObjectIntHashMap<String> readDictionary(Configuration conf, Path... dictPath) {
+ OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<>();
+ for (Path dictionaryFile : dictPath) {
+ for (Pair<Writable, IntWritable> record
+ : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
+ dictionary.put(record.getFirst().toString(), record.getSecond().get());
+ }
+ }
+ return dictionary;
+ }
+
+ public static String[] invertDictionary(OpenObjectIntHashMap<String> termIdMap) {
+ int maxTermId = -1;
+ for (String term : termIdMap.keys()) {
+ maxTermId = Math.max(maxTermId, termIdMap.get(term));
+ }
+ maxTermId++;
+ String[] dictionary = new String[maxTermId];
+ for (String term : termIdMap.keys()) {
+ dictionary[termIdMap.get(term)] = term;
+ }
+ return dictionary;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
new file mode 100644
index 0000000..0c45c9a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Writable to handle serialization of a vector and a variable list of
+ * associated label indexes.
+ */
+public final class MultiLabelVectorWritable implements Writable {
+
+ private final VectorWritable vectorWritable = new VectorWritable();
+ private int[] labels;
+
+ public MultiLabelVectorWritable() {
+ }
+
+ public MultiLabelVectorWritable(Vector vector, int[] labels) {
+ this.vectorWritable.set(vector);
+ this.labels = labels;
+ }
+
+ public Vector getVector() {
+ return vectorWritable.get();
+ }
+
+ public void setVector(Vector vector) {
+ vectorWritable.set(vector);
+ }
+
+ public void setLabels(int[] labels) {
+ this.labels = labels;
+ }
+
+ public int[] getLabels() {
+ return labels;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ vectorWritable.readFields(in);
+ int labelSize = in.readInt();
+ labels = new int[labelSize];
+ for (int i = 0; i < labelSize; i++) {
+ labels[i] = in.readInt();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vectorWritable.write(out);
+ out.writeInt(labels.length);
+ for (int label : labels) {
+ out.writeInt(label);
+ }
+ }
+
+ public static MultiLabelVectorWritable read(DataInput in) throws IOException {
+ MultiLabelVectorWritable writable = new MultiLabelVectorWritable();
+ writable.readFields(in);
+ return writable;
+ }
+
+ public static void write(DataOutput out, SequentialAccessSparseVector ssv, int[] labels) throws IOException {
+ new MultiLabelVectorWritable(ssv, labels).write(out);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
new file mode 100644
index 0000000..1a6ff16
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a
+ * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on
+ * Hadoop. The usage is as follows: <p>
+ * <p>
+ * <pre>
+ * // the path must already contain an already created SequenceFile!
+ * DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000);
+ * m.setConf(new Configuration());
+ * // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this
+ * // matrix. If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension
+ * // of the matrix.
+ * Vector v = new DenseVector(250000);
+ * // now the following operation will be done via a M/R pass via Hadoop.
+ * Vector w = m.timesSquared(v);
+ * </pre>
+ *
+ */
+public class DistributedRowMatrix implements VectorIterable, Configurable {
+ public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files";
+
+ private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class);
+
+ private final Path inputPath;
+ private final Path outputTmpPath;
+ private Configuration conf;
+ private Path rowPath;
+ private Path outputTmpBasePath;
+ private final int numRows;
+ private final int numCols;
+ private boolean keepTempFiles;
+
+ public DistributedRowMatrix(Path inputPath,
+ Path outputTmpPath,
+ int numRows,
+ int numCols) {
+ this(inputPath, outputTmpPath, numRows, numCols, false);
+ }
+
+ public DistributedRowMatrix(Path inputPath,
+ Path outputTmpPath,
+ int numRows,
+ int numCols,
+ boolean keepTempFiles) {
+ this.inputPath = inputPath;
+ this.outputTmpPath = outputTmpPath;
+ this.numRows = numRows;
+ this.numCols = numCols;
+ this.keepTempFiles = keepTempFiles;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+ rowPath = fs.makeQualified(inputPath);
+ outputTmpBasePath = fs.makeQualified(outputTmpPath);
+ keepTempFiles = conf.getBoolean(KEEP_TEMP_FILES, false);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ public Path getRowPath() {
+ return rowPath;
+ }
+
+ public Path getOutputTempPath() {
+ return outputTmpBasePath;
+ }
+
+ public void setOutputTempPathString(String outPathString) {
+ try {
+ outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outPathString));
+ } catch (IOException ioe) {
+ log.warn("Unable to set outputBasePath to {}, leaving as {}",
+ outPathString, outputTmpBasePath);
+ }
+ }
+
+ @Override
+ public Iterator<MatrixSlice> iterateAll() {
+ try {
+ Path pathPattern = rowPath;
+ if (FileSystem.get(conf).getFileStatus(rowPath).isDir()) {
+ pathPattern = new Path(rowPath, "*");
+ }
+ return Iterators.transform(
+ new SequenceFileDirIterator<IntWritable,VectorWritable>(pathPattern,
+ PathType.GLOB,
+ PathFilters.logsCRCFilter(),
+ null,
+ true,
+ conf),
+ new Function<Pair<IntWritable,VectorWritable>,MatrixSlice>() {
+ @Override
+ public MatrixSlice apply(Pair<IntWritable, VectorWritable> from) {
+ return new MatrixSlice(from.getSecond().get(), from.getFirst().get());
+ }
+ });
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ @Override
+ public int numSlices() {
+ return numRows();
+ }
+
+ @Override
+ public int numRows() {
+ return numRows;
+ }
+
+ @Override
+ public int numCols() {
+ return numCols;
+ }
+
+
+ /**
+ * This implements matrix this.transpose().times(other)
+ * @param other a DistributedRowMatrix
+ * @return a DistributedRowMatrix containing the product
+ */
+ public DistributedRowMatrix times(DistributedRowMatrix other) throws IOException {
+ return times(other, new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF)));
+ }
+
+ /**
+ * This implements matrix this.transpose().times(other)
+ * @param other a DistributedRowMatrix
+ * @param outPath path to write result to
+ * @return a DistributedRowMatrix containing the product
+ */
+ public DistributedRowMatrix times(DistributedRowMatrix other, Path outPath) throws IOException {
+ if (numRows != other.numRows()) {
+ throw new CardinalityException(numRows, other.numRows());
+ }
+
+ Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+ Configuration conf =
+ MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+ rowPath,
+ other.rowPath,
+ outPath,
+ other.numCols);
+ JobClient.runJob(new JobConf(conf));
+ DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols());
+ out.setConf(conf);
+ return out;
+ }
+
+ public Vector columnMeans() throws IOException {
+ return columnMeans("SequentialAccessSparseVector");
+ }
+
+ /**
+ * Returns the column-wise mean of a DistributedRowMatrix
+ *
+ * @param vectorClass
+ * desired class for the column-wise mean vector e.g.
+ * RandomAccessSparseVector, DenseVector
+ * @return Vector containing the column-wise mean of this
+ */
+ public Vector columnMeans(String vectorClass) throws IOException {
+ Path outputVectorTmpPath =
+ new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+ Configuration initialConf =
+ getConf() == null ? new Configuration() : getConf();
+ String vectorClassFull = "org.apache.mahout.math." + vectorClass;
+ Vector mean = MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath, vectorClassFull);
+ if (!keepTempFiles) {
+ FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+ fs.delete(outputVectorTmpPath, true);
+ }
+ return mean;
+ }
+
+ public DistributedRowMatrix transpose() throws IOException {
+ Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF));
+ Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+ Job transposeJob = TransposeJob.buildTransposeJob(initialConf, rowPath, outputPath, numRows);
+
+ try {
+ transposeJob.waitForCompletion(true);
+ } catch (Exception e) {
+ throw new IllegalStateException("transposition failed", e);
+ }
+
+ DistributedRowMatrix m = new DistributedRowMatrix(outputPath, outputTmpPath, numCols, numRows);
+ m.setConf(this.conf);
+ return m;
+ }
+
+ @Override
+ public Vector times(Vector v) {
+ try {
+ Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+ Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+
+ Job job = TimesSquaredJob.createTimesJob(initialConf, v, numRows, rowPath, outputVectorTmpPath);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw new IllegalStateException("times failed", e);
+ }
+
+ Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+ if (!keepTempFiles) {
+ FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+ fs.delete(outputVectorTmpPath, true);
+ }
+ return result;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ @Override
+ public Vector timesSquared(Vector v) {
+ try {
+ Configuration initialConf = getConf() == null ? new Configuration() : getConf();
+ Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime())));
+
+ Job job = TimesSquaredJob.createTimesSquaredJob(initialConf, v, rowPath, outputVectorTmpPath);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw new IllegalStateException("timesSquared failed", e);
+ }
+
+ Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+ if (!keepTempFiles) {
+ FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+ fs.delete(outputVectorTmpPath, true);
+ }
+ return result;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ @Override
+ public Iterator<MatrixSlice> iterator() {
+ return iterateAll();
+ }
+
+ public static class MatrixEntryWritable implements WritableComparable<MatrixEntryWritable> {
+ private int row;
+ private int col;
+ private double val;
+
+ public int getRow() {
+ return row;
+ }
+
+ public void setRow(int row) {
+ this.row = row;
+ }
+
+ public int getCol() {
+ return col;
+ }
+
+ public void setCol(int col) {
+ this.col = col;
+ }
+
+ public double getVal() {
+ return val;
+ }
+
+ public void setVal(double val) {
+ this.val = val;
+ }
+
+ @Override
+ public int compareTo(MatrixEntryWritable o) {
+ if (row > o.row) {
+ return 1;
+ } else if (row < o.row) {
+ return -1;
+ } else {
+ if (col > o.col) {
+ return 1;
+ } else if (col < o.col) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MatrixEntryWritable)) {
+ return false;
+ }
+ MatrixEntryWritable other = (MatrixEntryWritable) o;
+ return row == other.row && col == other.col;
+ }
+
+ @Override
+ public int hashCode() {
+ return row + 31 * col;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(row);
+ out.writeInt(col);
+ out.writeDouble(val);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ row = in.readInt();
+ col = in.readInt();
+ val = in.readDouble();
+ }
+
+ @Override
+ public String toString() {
+ return "(" + row + ',' + col + "):" + val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
new file mode 100644
index 0000000..b4f459a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.io.Closeables;
+
+/**
+ * MatrixColumnMeansJob is a job for calculating the column-wise mean of a
+ * DistributedRowMatrix. This job can be accessed using
+ * DistributedRowMatrix.columnMeans()
+ */
+public final class MatrixColumnMeansJob {
+
+ public static final String VECTOR_CLASS =
+ "DistributedRowMatrix.columnMeans.vector.class";
+
+ private MatrixColumnMeansJob() {
+ }
+
+ public static Vector run(Configuration conf,
+ Path inputPath,
+ Path outputVectorTmpPath) throws IOException {
+ return run(conf, inputPath, outputVectorTmpPath, null);
+ }
+
+ /**
+ * Job for calculating column-wise mean of a DistributedRowMatrix
+ *
+ * @param initialConf
+ * @param inputPath
+ * path to DistributedRowMatrix input
+ * @param outputVectorTmpPath
+ * path for temporary files created during job
+ * @param vectorClass
+ * String of desired class for returned vector e.g. DenseVector,
+ * RandomAccessSparseVector (may be null for {@link DenseVector} )
+ * @return Vector containing column-wise mean of DistributedRowMatrix
+ */
+ public static Vector run(Configuration initialConf,
+ Path inputPath,
+ Path outputVectorTmpPath,
+ String vectorClass) throws IOException {
+
+ try {
+ initialConf.set(VECTOR_CLASS,
+ vectorClass == null ? DenseVector.class.getName()
+ : vectorClass);
+
+ Job job = new Job(initialConf, "MatrixColumnMeansJob");
+ job.setJarByClass(MatrixColumnMeansJob.class);
+
+ FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+
+ outputVectorTmpPath.getFileSystem(job.getConfiguration())
+ .delete(outputVectorTmpPath, true);
+ job.setNumReduceTasks(1);
+ FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+ FileInputFormat.addInputPath(job, inputPath);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+
+ job.setMapperClass(MatrixColumnMeansMapper.class);
+ job.setReducerClass(MatrixColumnMeansReducer.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.submit();
+ job.waitForCompletion(true);
+
+ Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000");
+ SequenceFileValueIterator<VectorWritable> iterator =
+ new SequenceFileValueIterator<>(tmpFile, true, initialConf);
+ try {
+ if (iterator.hasNext()) {
+ return iterator.next().get();
+ } else {
+ return (Vector) Class.forName(vectorClass).getConstructor(int.class)
+ .newInstance(0);
+ }
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Throwable thr) {
+ throw new IOException(thr);
+ }
+ }
+
+ /**
+ * Mapper for calculation of column-wise mean.
+ */
+ public static class MatrixColumnMeansMapper extends
+ Mapper<Writable, VectorWritable, NullWritable, VectorWritable> {
+
+ private Vector runningSum;
+ private String vectorClass;
+
+ @Override
+ public void setup(Context context) {
+ vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+ }
+
+ /**
+ * The mapper computes a running sum of the vectors the task has seen.
+ * Element 0 of the running sum vector contains a count of the number of
+ * vectors that have been seen. The remaining elements contain the
+ * column-wise running sum. Nothing is written at this stage
+ */
+ @Override
+ public void map(Writable r, VectorWritable v, Context context)
+ throws IOException {
+ if (runningSum == null) {
+ /*
+ * If this is the first vector the mapper has seen, instantiate a new
+ * vector using the parameter VECTOR_CLASS
+ */
+ runningSum = ClassUtils.instantiateAs(vectorClass,
+ Vector.class,
+ new Class<?>[] { int.class },
+ new Object[] { v.get().size() + 1 });
+ runningSum.set(0, 1);
+ runningSum.viewPart(1, v.get().size()).assign(v.get());
+ } else {
+ runningSum.set(0, runningSum.get(0) + 1);
+ runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS);
+ }
+ }
+
+ /**
+ * The column-wise sum is written at the cleanup stage. A single reducer is
+ * forced so null can be used for the key
+ */
+ @Override
+ public void cleanup(Context context) throws InterruptedException,
+ IOException {
+ if (runningSum != null) {
+ context.write(NullWritable.get(), new VectorWritable(runningSum));
+ }
+ }
+
+ }
+
+ /**
+ * The reducer adds the partial column-wise sums from each of the mappers to
+ * compute the total column-wise sum. The total sum is then divided by the
+ * total count of vectors to determine the column-wise mean.
+ */
+ public static class MatrixColumnMeansReducer extends
+ Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private static final IntWritable ONE = new IntWritable(1);
+
+ private String vectorClass;
+ private Vector outputVector;
+ private final VectorWritable outputVectorWritable = new VectorWritable();
+
+ @Override
+ public void setup(Context context) {
+ vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+ }
+
+ @Override
+ public void reduce(NullWritable n,
+ Iterable<VectorWritable> vectors,
+ Context context) throws IOException, InterruptedException {
+
+ /**
+ * Add together partial column-wise sums from mappers
+ */
+ for (VectorWritable v : vectors) {
+ if (outputVector == null) {
+ outputVector = v.get();
+ } else {
+ outputVector.assign(v.get(), Functions.PLUS);
+ }
+ }
+
+ /**
+ * Divide total column-wise sum by count of vectors, which corresponds to
+ * the number of rows in the DistributedRowMatrix
+ */
+ if (outputVector != null) {
+ outputVectorWritable.set(outputVector.viewPart(1,
+ outputVector.size() - 1)
+ .divide(outputVector.get(0)));
+ context.write(ONE, outputVectorWritable);
+ } else {
+ Vector emptyVector = ClassUtils.instantiateAs(vectorClass,
+ Vector.class,
+ new Class<?>[] { int.class },
+ new Object[] { 0 });
+ context.write(ONE, new VectorWritable(emptyVector));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
new file mode 100644
index 0000000..48eda08
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.join.CompositeInputFormat;
+import org.apache.hadoop.mapred.join.TupleWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This still uses the old MR api and as with all things in Mahout that are MapReduce is now part of 'mahout-mr'.
+ * There is no plan to convert the old MR api used here to the new MR api.
+ * This will be replaced by the new Spark based Linear Algebra bindings.
+ */
+
+public class MatrixMultiplicationJob extends AbstractJob {
+
+ private static final String OUT_CARD = "output.vector.cardinality";
+
+ public static Configuration createMatrixMultiplyJobConf(Path aPath,
+ Path bPath,
+ Path outPath,
+ int outCardinality) {
+ return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, outPath, outCardinality);
+ }
+
+ public static Configuration createMatrixMultiplyJobConf(Configuration initialConf,
+ Path aPath,
+ Path bPath,
+ Path outPath,
+ int outCardinality) {
+ JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class);
+ conf.setInputFormat(CompositeInputFormat.class);
+ conf.set("mapred.join.expr", CompositeInputFormat.compose(
+ "inner", SequenceFileInputFormat.class, aPath, bPath));
+ conf.setInt(OUT_CARD, outCardinality);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(conf, outPath);
+ conf.setMapperClass(MatrixMultiplyMapper.class);
+ conf.setCombinerClass(MatrixMultiplicationReducer.class);
+ conf.setReducerClass(MatrixMultiplicationReducer.class);
+ conf.setMapOutputKeyClass(IntWritable.class);
+ conf.setMapOutputValueClass(VectorWritable.class);
+ conf.setOutputKeyClass(IntWritable.class);
+ conf.setOutputValueClass(VectorWritable.class);
+ return conf;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new MatrixMultiplicationJob(), args);
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ addOption("numRowsA", "nra", "Number of rows of the first input matrix", true);
+ addOption("numColsA", "nca", "Number of columns of the first input matrix", true);
+ addOption("numRowsB", "nrb", "Number of rows of the second input matrix", true);
+
+ addOption("numColsB", "ncb", "Number of columns of the second input matrix", true);
+ addOption("inputPathA", "ia", "Path to the first input matrix", true);
+ addOption("inputPathB", "ib", "Path to the second input matrix", true);
+
+ addOption("outputPath", "op", "Path to the output matrix", false);
+
+ Map<String, List<String>> argMap = parseArguments(strings);
+ if (argMap == null) {
+ return -1;
+ }
+
+ DistributedRowMatrix a = new DistributedRowMatrix(new Path(getOption("inputPathA")),
+ new Path(getOption("tempDir")),
+ Integer.parseInt(getOption("numRowsA")),
+ Integer.parseInt(getOption("numColsA")));
+ DistributedRowMatrix b = new DistributedRowMatrix(new Path(getOption("inputPathB")),
+ new Path(getOption("tempDir")),
+ Integer.parseInt(getOption("numRowsB")),
+ Integer.parseInt(getOption("numColsB")));
+
+ a.setConf(new Configuration(getConf()));
+ b.setConf(new Configuration(getConf()));
+
+ if (hasOption("outputPath")) {
+ a.times(b, new Path(getOption("outputPath")));
+ } else {
+ a.times(b);
+ }
+
+ return 0;
+ }
+
+ public static class MatrixMultiplyMapper extends MapReduceBase
+ implements Mapper<IntWritable,TupleWritable,IntWritable,VectorWritable> {
+
+ private int outCardinality;
+ private final IntWritable row = new IntWritable();
+
+ @Override
+ public void configure(JobConf conf) {
+ outCardinality = conf.getInt(OUT_CARD, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void map(IntWritable index,
+ TupleWritable v,
+ OutputCollector<IntWritable,VectorWritable> out,
+ Reporter reporter) throws IOException {
+ boolean firstIsOutFrag = ((VectorWritable)v.get(0)).get().size() == outCardinality;
+ Vector outFrag = firstIsOutFrag ? ((VectorWritable)v.get(0)).get() : ((VectorWritable)v.get(1)).get();
+ Vector multiplier = firstIsOutFrag ? ((VectorWritable)v.get(1)).get() : ((VectorWritable)v.get(0)).get();
+
+ VectorWritable outVector = new VectorWritable();
+ for (Vector.Element e : multiplier.nonZeroes()) {
+ row.set(e.index());
+ outVector.set(outFrag.times(e.get()));
+ out.collect(row, outVector);
+ }
+ }
+ }
+
+ public static class MatrixMultiplicationReducer extends MapReduceBase
+ implements Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ @Override
+ public void reduce(IntWritable rowNum,
+ Iterator<VectorWritable> it,
+ OutputCollector<IntWritable,VectorWritable> out,
+ Reporter reporter) throws IOException {
+ if (!it.hasNext()) {
+ return;
+ }
+ Vector accumulator = new RandomAccessSparseVector(it.next().get());
+ while (it.hasNext()) {
+ Vector row = it.next().get();
+ accumulator.assign(row, Functions.PLUS);
+ }
+ out.collect(rowNum, new VectorWritable(new SequentialAccessSparseVector(accumulator)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
new file mode 100644
index 0000000..e234eb9
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.net.URI;
+
+public final class TimesSquaredJob {
+
+ public static final String INPUT_VECTOR = "DistributedMatrix.times.inputVector";
+ public static final String IS_SPARSE_OUTPUT = "DistributedMatrix.times.outputVector.sparse";
+ public static final String OUTPUT_VECTOR_DIMENSION = "DistributedMatrix.times.output.dimension";
+
+ public static final String OUTPUT_VECTOR_FILENAME = "DistributedMatrix.times.outputVector";
+
+ private TimesSquaredJob() { }
+
+ public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPath)
+ throws IOException {
+ return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPath);
+ }
+
+ public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath,
+ Path outputVectorPath) throws IOException {
+
+ return createTimesSquaredJob(initialConf, v, matrixInputPath, outputVectorPath, TimesSquaredMapper.class,
+ VectorSummingReducer.class);
+ }
+
+ public static Job createTimesJob(Vector v, int outDim, Path matrixInputPath, Path outputVectorPath)
+ throws IOException {
+
+ return createTimesJob(new Configuration(), v, outDim, matrixInputPath, outputVectorPath);
+ }
+
+ public static Job createTimesJob(Configuration initialConf, Vector v, int outDim, Path matrixInputPath,
+ Path outputVectorPath) throws IOException {
+
+ return createTimesSquaredJob(initialConf, v, outDim, matrixInputPath, outputVectorPath, TimesMapper.class,
+ VectorSummingReducer.class);
+ }
+
+ public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPathBase,
+ Class<? extends TimesSquaredMapper> mapClass, Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+ return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPathBase, mapClass, redClass);
+ }
+
+ public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath,
+ Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+ Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+ return createTimesSquaredJob(initialConf, v, v.size(), matrixInputPath, outputVectorPathBase, mapClass, redClass);
+ }
+
+ public static Job createTimesSquaredJob(Vector v, int outputVectorDim, Path matrixInputPath,
+ Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+ Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+ return createTimesSquaredJob(new Configuration(), v, outputVectorDim, matrixInputPath, outputVectorPathBase,
+ mapClass, redClass);
+ }
+
+ public static Job createTimesSquaredJob(Configuration initialConf, Vector v, int outputVectorDim,
+ Path matrixInputPath, Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+ Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+ FileSystem fs = FileSystem.get(matrixInputPath.toUri(), initialConf);
+ matrixInputPath = fs.makeQualified(matrixInputPath);
+ outputVectorPathBase = fs.makeQualified(outputVectorPathBase);
+
+ long now = System.nanoTime();
+ Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + '/' + now);
+
+
+ SequenceFile.Writer inputVectorPathWriter = null;
+
+ try {
+ inputVectorPathWriter = new SequenceFile.Writer(fs, initialConf, inputVectorPath, NullWritable.class,
+ VectorWritable.class);
+ inputVectorPathWriter.append(NullWritable.get(), new VectorWritable(v));
+ } finally {
+ Closeables.close(inputVectorPathWriter, false);
+ }
+
+ URI ivpURI = inputVectorPath.toUri();
+ DistributedCache.setCacheFiles(new URI[] { ivpURI }, initialConf);
+
+ Job job = HadoopUtil.prepareJob(matrixInputPath, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME),
+ SequenceFileInputFormat.class, mapClass, NullWritable.class, VectorWritable.class, redClass,
+ NullWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, initialConf);
+ job.setCombinerClass(redClass);
+ job.setJobName("TimesSquaredJob: " + matrixInputPath);
+
+ Configuration conf = job.getConfiguration();
+ conf.set(INPUT_VECTOR, ivpURI.toString());
+ conf.setBoolean(IS_SPARSE_OUTPUT, !v.isDense());
+ conf.setInt(OUTPUT_VECTOR_DIMENSION, outputVectorDim);
+
+ return job;
+ }
+
+ public static Vector retrieveTimesSquaredOutputVector(Path outputVectorTmpPath, Configuration conf)
+ throws IOException {
+ Path outputFile = new Path(outputVectorTmpPath, OUTPUT_VECTOR_FILENAME + "/part-r-00000");
+ SequenceFileValueIterator<VectorWritable> iterator =
+ new SequenceFileValueIterator<>(outputFile, true, conf);
+ try {
+ return iterator.next().get();
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ }
+
+ public static class TimesSquaredMapper<T extends WritableComparable>
+ extends Mapper<T,VectorWritable, NullWritable,VectorWritable> {
+
+ private Vector outputVector;
+ private Vector inputVector;
+
+ Vector getOutputVector() {
+ return outputVector;
+ }
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ try {
+ Configuration conf = ctx.getConfiguration();
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+ Preconditions.checkArgument(localFiles != null && localFiles.length >= 1,
+ "missing paths from the DistributedCache");
+
+ Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf);
+
+ SequenceFileValueIterator<VectorWritable> iterator =
+ new SequenceFileValueIterator<>(inputVectorPath, true, conf);
+ try {
+ inputVector = iterator.next().get();
+ } finally {
+ Closeables.close(iterator, true);
+ }
+
+ int outDim = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+ outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+ ? new RandomAccessSparseVector(outDim, 10)
+ : new DenseVector(outDim);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+ @Override
+ protected void map(T key, VectorWritable v, Context context) throws IOException, InterruptedException {
+
+ double d = scale(v);
+ if (d == 1.0) {
+ outputVector.assign(v.get(), Functions.PLUS);
+ } else if (d != 0.0) {
+ outputVector.assign(v.get(), Functions.plusMult(d));
+ }
+ }
+
+ protected double scale(VectorWritable v) {
+ return v.get().dot(inputVector);
+ }
+
+ @Override
+ protected void cleanup(Context ctx) throws IOException, InterruptedException {
+ ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+ }
+
+ }
+
+ public static class TimesMapper extends TimesSquaredMapper<IntWritable> {
+
+
+ @Override
+ protected void map(IntWritable rowNum, VectorWritable v, Context context) throws IOException, InterruptedException {
+ double d = scale(v);
+ if (d != 0.0) {
+ getOutputVector().setQuick(rowNum.get(), d);
+ }
+ }
+ }
+
+ public static class VectorSummingReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+
+ private Vector outputVector;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ Configuration conf = ctx.getConfiguration();
+ int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+ outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+ ? new RandomAccessSparseVector(outputDimension, 10)
+ : new DenseVector(outputDimension);
+ }
+
+ @Override
+ protected void reduce(NullWritable key, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+
+ for (VectorWritable v : vectors) {
+ if (v != null) {
+ outputVector.assign(v.get(), Functions.PLUS);
+ }
+ }
+ ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
new file mode 100644
index 0000000..60066c6
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
+import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
+import org.apache.mahout.common.mapreduce.TransposeMapper;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/** Transpose a matrix */
+public class TransposeJob extends AbstractJob {
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new TransposeJob(), args);
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ addInputOption();
+ addOption("numRows", "nr", "Number of rows of the input matrix");
+ addOption("numCols", "nc", "Number of columns of the input matrix");
+ Map<String, List<String>> parsedArgs = parseArguments(strings);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ int numRows = Integer.parseInt(getOption("numRows"));
+ int numCols = Integer.parseInt(getOption("numCols"));
+
+ DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), getTempPath(), numRows, numCols);
+ matrix.setConf(new Configuration(getConf()));
+ matrix.transpose();
+
+ return 0;
+ }
+
+ public static Job buildTransposeJob(Path matrixInputPath, Path matrixOutputPath, int numInputRows)
+ throws IOException {
+ return buildTransposeJob(new Configuration(), matrixInputPath, matrixOutputPath, numInputRows);
+ }
+
+ public static Job buildTransposeJob(Configuration initialConf, Path matrixInputPath, Path matrixOutputPath,
+ int numInputRows) throws IOException {
+
+ Job job = HadoopUtil.prepareJob(matrixInputPath, matrixOutputPath, SequenceFileInputFormat.class,
+ TransposeMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class,
+ VectorWritable.class, SequenceFileOutputFormat.class, initialConf);
+ job.setCombinerClass(MergeVectorsCombiner.class);
+ job.getConfiguration().setInt(TransposeMapper.NEW_NUM_COLS_PARAM, numInputRows);
+
+ job.setJobName("TransposeJob: " + matrixInputPath);
+
+ return job;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
new file mode 100644
index 0000000..89dddcc
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.lanczos.LanczosSolver;
+import org.apache.mahout.math.decomposer.lanczos.LanczosState;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See the SSVD code for a better option than using this:
+ *
+ * http://mahout.apache.org/users/dim-reduction/ssvd.html
+ * @see org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver
+ */
+public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+
+ public static final String RAW_EIGENVECTORS = "rawEigenvectors";
+
+ private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class);
+
+ private Configuration conf;
+
+ private Map<String, List<String>> parsedArgs;
+
+ /**
+ * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be
+ * uniform over all input dimensions, L_2 normalized.
+ */
+ public static Vector getInitialVector(VectorIterable corpus) {
+ Vector initialVector = new DenseVector(corpus.numCols());
+ initialVector.assign(1.0 / Math.sqrt(corpus.numCols()));
+ return initialVector;
+ }
+
+ public LanczosState runJob(Configuration originalConfig,
+ LanczosState state,
+ int desiredRank,
+ boolean isSymmetric,
+ String outputEigenVectorPathString) throws IOException {
+ ((Configurable) state.getCorpus()).setConf(new Configuration(originalConfig));
+ setConf(originalConfig);
+ solve(state, desiredRank, isSymmetric);
+ serializeOutput(state, new Path(outputEigenVectorPathString));
+ return state;
+ }
+
+ /**
+ * Factored-out LanczosSolver for the purpose of invoking it programmatically
+ */
+ public LanczosState runJob(Configuration originalConfig,
+ Path inputPath,
+ Path outputTmpPath,
+ int numRows,
+ int numCols,
+ boolean isSymmetric,
+ int desiredRank,
+ String outputEigenVectorPathString) throws IOException {
+ DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
+ matrix.setConf(new Configuration(originalConfig));
+ LanczosState state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
+ return runJob(originalConfig, state, desiredRank, isSymmetric, outputEigenVectorPathString);
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ Path inputPath = new Path(AbstractJob.getOption(parsedArgs, "--input"));
+ Path outputPath = new Path(AbstractJob.getOption(parsedArgs, "--output"));
+ Path outputTmpPath = new Path(AbstractJob.getOption(parsedArgs, "--tempDir"));
+ Path workingDirPath = AbstractJob.getOption(parsedArgs, "--workingDir") != null
+ ? new Path(AbstractJob.getOption(parsedArgs, "--workingDir")) : null;
+ int numRows = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numRows"));
+ int numCols = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numCols"));
+ boolean isSymmetric = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--symmetric"));
+ int desiredRank = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--rank"));
+
+ boolean cleansvd = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--cleansvd"));
+ if (cleansvd) {
+ double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--maxError"));
+ double minEigenvalue = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue"));
+ boolean inMemory = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory"));
+ return run(inputPath,
+ outputPath,
+ outputTmpPath,
+ workingDirPath,
+ numRows,
+ numCols,
+ isSymmetric,
+ desiredRank,
+ maxError,
+ minEigenvalue,
+ inMemory);
+ }
+ return run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, isSymmetric, desiredRank);
+ }
+
+ /**
+ * Run the solver to produce raw eigenvectors, then run the EigenVerificationJob to clean them
+ *
+ * @param inputPath the Path to the input corpus
+ * @param outputPath the Path to the output
+ * @param outputTmpPath a Path to a temporary working directory
+ * @param numRows the int number of rows
+ * @param numCols the int number of columns
+ * @param isSymmetric true if the input matrix is symmetric
+ * @param desiredRank the int desired rank of eigenvectors to produce
+ * @param maxError the maximum allowable error
+ * @param minEigenvalue the minimum usable eigenvalue
+ * @param inMemory true if the verification can be done in memory
+ * @return an int indicating success (0) or otherwise
+ */
+ public int run(Path inputPath,
+ Path outputPath,
+ Path outputTmpPath,
+ Path workingDirPath,
+ int numRows,
+ int numCols,
+ boolean isSymmetric,
+ int desiredRank,
+ double maxError,
+ double minEigenvalue,
+ boolean inMemory) throws Exception {
+ int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols,
+ isSymmetric, desiredRank);
+ if (result != 0) {
+ return result;
+ }
+ Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+ return new EigenVerificationJob().run(inputPath,
+ rawEigenVectorPath,
+ outputPath,
+ outputTmpPath,
+ maxError,
+ minEigenvalue,
+ inMemory,
+ getConf() != null ? new Configuration(getConf()) : new Configuration());
+ }
+
+ /**
+ * Run the solver to produce the raw eigenvectors
+ *
+ * @param inputPath the Path to the input corpus
+ * @param outputPath the Path to the output
+ * @param outputTmpPath a Path to a temporary working directory
+ * @param numRows the int number of rows
+ * @param numCols the int number of columns
+ * @param isSymmetric true if the input matrix is symmetric
+ * @param desiredRank the int desired rank of eigenvectors to produce
+ * @return an int indicating success (0) or otherwise
+ */
+ public int run(Path inputPath,
+ Path outputPath,
+ Path outputTmpPath,
+ Path workingDirPath,
+ int numRows,
+ int numCols,
+ boolean isSymmetric,
+ int desiredRank) throws Exception {
+ DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
+ matrix.setConf(new Configuration(getConf() != null ? getConf() : new Configuration()));
+
+ LanczosState state;
+ if (workingDirPath == null) {
+ state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
+ } else {
+ HdfsBackedLanczosState hState =
+ new HdfsBackedLanczosState(matrix, desiredRank, getInitialVector(matrix), workingDirPath);
+ hState.setConf(matrix.getConf());
+ state = hState;
+ }
+ solve(state, desiredRank, isSymmetric);
+
+ Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+ serializeOutput(state, outputEigenVectorPath);
+ return 0;
+ }
+
+ /**
+ * @param state The final LanczosState to be serialized
+ * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to.
+ */
+ public void serializeOutput(LanczosState state, Path outputPath) throws IOException {
+ int numEigenVectors = state.getIterationNumber();
+ log.info("Persisting {} eigenVectors and eigenValues to: {}", numEigenVectors, outputPath);
+ Configuration conf = getConf() != null ? getConf() : new Configuration();
+ FileSystem fs = FileSystem.get(outputPath.toUri(), conf);
+ SequenceFile.Writer seqWriter =
+ new SequenceFile.Writer(fs, conf, outputPath, IntWritable.class, VectorWritable.class);
+ try {
+ IntWritable iw = new IntWritable();
+ for (int i = 0; i < numEigenVectors; i++) {
+ // Persist eigenvectors sorted by eigenvalues in descending order\
+ NamedVector v = new NamedVector(state.getRightSingularVector(numEigenVectors - 1 - i),
+ "eigenVector" + i + ", eigenvalue = " + state.getSingularValue(numEigenVectors - 1 - i));
+ Writable vw = new VectorWritable(v);
+ iw.set(i);
+ seqWriter.append(iw, vw);
+ }
+ } finally {
+ Closeables.close(seqWriter, false);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public DistributedLanczosSolverJob job() {
+ return new DistributedLanczosSolverJob();
+ }
+
+ /**
+ * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still
+ * sublcass LanczosSolver.
+ */
+ public class DistributedLanczosSolverJob extends AbstractJob {
+ @Override
+ public void setConf(Configuration conf) {
+ DistributedLanczosSolver.this.setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return DistributedLanczosSolver.this.getConf();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption("numRows", "nr", "Number of rows of the input matrix");
+ addOption("numCols", "nc", "Number of columns of the input matrix");
+ addOption("rank", "r", "Desired decomposition rank (note: only roughly 1/4 to 1/3 "
+ + "of these will have the top portion of the spectrum)");
+ addOption("symmetric", "sym", "Is the input matrix square and symmetric?");
+ addOption("workingDir", "wd", "Working directory path to store Lanczos basis vectors "
+ + "(to be used on restarts, and to avoid too much RAM usage)");
+ // options required to run cleansvd job
+ addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the eigenvectors after SVD", false);
+ addOption("maxError", "err", "Maximum acceptable error", "0.05");
+ addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+ addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+
+ DistributedLanczosSolver.this.parsedArgs = parseArguments(args);
+ if (DistributedLanczosSolver.this.parsedArgs == null) {
+ return -1;
+ } else {
+ return DistributedLanczosSolver.this.run(args);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new DistributedLanczosSolver().job(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
new file mode 100644
index 0000000..d2f0c8c
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+
+import java.util.regex.Pattern;
+
+/**
+ * TODO this is a horrible hack. Make a proper writable subclass also.
+ */
+public class EigenVector extends NamedVector {
+
+ private static final Pattern EQUAL_PATTERN = Pattern.compile(" = ");
+ private static final Pattern PIPE_PATTERN = Pattern.compile("\\|");
+
+ public EigenVector(Vector v, double eigenValue, double cosAngleError, int order) {
+ super(v instanceof DenseVector ? (DenseVector) v : new DenseVector(v),
+ "e|" + order + "| = |" + eigenValue + "|, err = " + cosAngleError);
+ }
+
+ public double getEigenValue() {
+ return getEigenValue(getName());
+ }
+
+ public double getCosAngleError() {
+ return getCosAngleError(getName());
+ }
+
+ public int getIndex() {
+ return getIndex(getName());
+ }
+
+ public static double getEigenValue(CharSequence name) {
+ return parseMetaData(name)[1];
+ }
+
+ public static double getCosAngleError(CharSequence name) {
+ return parseMetaData(name)[2];
+ }
+
+ public static int getIndex(CharSequence name) {
+ return (int)parseMetaData(name)[0];
+ }
+
+ public static double[] parseMetaData(CharSequence name) {
+ double[] m = new double[3];
+ String[] s = EQUAL_PATTERN.split(name);
+ m[0] = Double.parseDouble(PIPE_PATTERN.split(s[0])[1]);
+ m[1] = Double.parseDouble(PIPE_PATTERN.split(s[1])[1]);
+ m[2] = Double.parseDouble(s[2].substring(1));
+ return m;
+ }
+
+ protected double[] parseMetaData() {
+ return parseMetaData(getName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
new file mode 100644
index 0000000..a7eaaed
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.SparseRowMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SimpleEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in
+ * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t.
+ * eigenvector-ness is the cosine of the angle between e and e':
+ * </p>
+ *
+ * <pre>
+ * error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
+ * </pre>
+ * <p>
+ * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between
+ * eigenvectors, and checks that this is close to the identity matrix.
+ * </p>
+ * <p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies
+ * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum
+ * error (as defined above) to be tolerated in an eigenvector.
+ * </p>
+ * <p>
+ * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
+ * </p>
+ */
+public class EigenVerificationJob extends AbstractJob {
+
+ public static final String CLEAN_EIGENVECTORS = "cleanEigenvectors";
+
+ private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class);
+
+ private SingularVectorVerifier eigenVerifier;
+
+ private VectorIterable eigensToVerify;
+
+ private VectorIterable corpus;
+
+ private double maxError;
+
+ private double minEigenValue;
+
+ // private boolean loadEigensInMemory;
+
+ private Path tmpOut;
+
+ private Path outPath;
+
+ private int maxEigensToKeep;
+
+ private Path cleanedEigensPath;
+
+ public void setEigensToVerify(VectorIterable eigens) {
+ eigensToVerify = eigens;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Map<String,List<String>> argMap = handleArgs(args);
+ if (argMap == null) {
+ return -1;
+ }
+ if (argMap.isEmpty()) {
+ return 0;
+ }
+ // parse out the arguments
+ runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(),
+ getOption("inMemory") != null, Double.parseDouble(getOption("maxError")),
+ // Double.parseDouble(getOption("minEigenvalue")),
+ Integer.parseInt(getOption("maxEigens")));
+ return 0;
+ }
+
+ /**
+ * Run the job with the given arguments
+ *
+ * @param corpusInput
+ * the corpus input Path
+ * @param eigenInput
+ * the eigenvector input Path
+ * @param output
+ * the output Path
+ * @param tempOut
+ * temporary output Path
+ * @param maxError
+ * a double representing the maximum error
+ * @param minEigenValue
+ * a double representing the minimum eigenvalue
+ * @param inMemory
+ * a boolean requesting in-memory preparation
+ * @param conf
+ * the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes
+ * unless needed)
+ */
+ public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue,
+ boolean inMemory, Configuration conf) throws IOException {
+ this.outPath = output;
+ this.tmpOut = tempOut;
+ this.maxError = maxError;
+ this.minEigenValue = minEigenValue;
+
+ if (eigenInput != null && eigensToVerify == null) {
+ prepareEigens(conf, eigenInput, inMemory);
+ }
+ DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tempOut, 1, 1);
+ c.setConf(conf);
+ corpus = c;
+
+ // set up eigenverifier and orthoverifier TODO: allow multithreaded execution
+
+ eigenVerifier = new SimpleEigenVerifier();
+
+ // we don't currently verify orthonormality here.
+ // VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
+
+ Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+
+ saveCleanEigens(new Configuration(), prunedEigenMeta);
+ return 0;
+ }
+
+ private Map<String,List<String>> handleArgs(String[] args) throws IOException {
+ addOutputOption();
+ addOption("eigenInput", "ei",
+ "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
+ addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
+ addOption(DefaultOptionCreator.outputOption().create());
+ addOption(DefaultOptionCreator.helpOption());
+ addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+ addOption("maxError", "err", "Maximum acceptable error", "0.05");
+ addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+ addOption("maxEigens", "max", "Maximum number of eigenvectors to keep (0 means all)", "0");
+
+ return parseArguments(args);
+ }
+
+ private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta)
+ throws IOException {
+ Path path = new Path(outPath, CLEAN_EIGENVECTORS);
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+ try {
+ IntWritable iw = new IntWritable();
+ int numEigensWritten = 0;
+ int index = 0;
+ for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
+ MatrixSlice s = pruneSlice.getKey();
+ EigenStatus meta = pruneSlice.getValue();
+ EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index());
+ // log.info("appending {} to {}", ev, path);
+ Writable vw = new VectorWritable(ev);
+ iw.set(index++);
+ seqWriter.append(iw, vw);
+
+ // increment the number of eigenvectors written and see if we've
+ // reached our specified limit, or if we wish to write all eigenvectors
+ // (latter is built-in, since numEigensWritten will always be > 0
+ numEigensWritten++;
+ if (numEigensWritten == maxEigensToKeep) {
+ log.info("{} of the {} total eigens have been written", maxEigensToKeep, prunedEigenMeta.size());
+ break;
+ }
+ }
+ } finally {
+ Closeables.close(seqWriter, false);
+ }
+ cleanedEigensPath = path;
+ }
+
+ private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList();
+
+ for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
+ if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
+ prunedEigenMeta.add(entry);
+ }
+ }
+
+ Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
+ @Override
+ public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) {
+ // sort eigens on eigenvalues in descending order
+ Double eg1 = e1.getValue().getEigenValue();
+ Double eg2 = e2.getValue().getEigenValue();
+ return eg1.compareTo(eg2);
+ }
+ });
+
+ // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones
+ List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList();
+ Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0);
+ selectedEigenMeta.add(e1);
+ int selectedEigenMetaLength = selectedEigenMeta.size();
+ int prunedEigenMetaLength = prunedEigenMeta.size();
+
+ while (prunedEigenMetaLength > 0) {
+ double sum = Double.MAX_VALUE;
+ int index = 0;
+ for (int i = 0; i < prunedEigenMetaLength; i++) {
+ Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i);
+ double tmp = 0;
+ for (int j = 0; j < selectedEigenMetaLength; j++) {
+ Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j);
+ tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2);
+ }
+ if (tmp < sum) {
+ sum = tmp;
+ index = i;
+ }
+ }
+ Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index);
+ selectedEigenMeta.add(e);
+ selectedEigenMetaLength++;
+ prunedEigenMetaLength--;
+ }
+
+ return selectedEigenMeta;
+ }
+
+ private Map<MatrixSlice,EigenStatus> verifyEigens() {
+ Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap();
+
+ for (MatrixSlice slice : eigensToVerify) {
+ EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
+ eigenMetaData.put(slice, status);
+ }
+ return eigenMetaData;
+ }
+
+ private void prepareEigens(Configuration conf, Path eigenInput, boolean inMemory) {
+ DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1);
+ eigens.setConf(conf);
+ if (inMemory) {
+ List<Vector> eigenVectors = Lists.newArrayList();
+ for (MatrixSlice slice : eigens) {
+ eigenVectors.add(slice.vector());
+ }
+ eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(),
+ eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true);
+
+ } else {
+ eigensToVerify = eigens;
+ }
+ }
+
+ public Path getCleanedEigensPath() {
+ return cleanedEigensPath;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new EigenVerificationJob(), args);
+ }
+
+ /**
+ * Progammatic invocation of run()
+ *
+ * @param eigenInput
+ * Output of LanczosSolver
+ * @param corpusInput
+ * Input of LanczosSolver
+ */
+ public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory,
+ double maxError, int maxEigens) throws IOException {
+ // no need to handle command line arguments
+ outPath = output;
+ tmpOut = new Path(outPath, "tmp");
+ maxEigensToKeep = maxEigens;
+ this.maxError = maxError;
+ if (eigenInput != null && eigensToVerify == null) {
+ prepareEigens(new Configuration(conf), eigenInput, inMemory);
+ }
+
+ DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tmpOut, 1, 1);
+ c.setConf(new Configuration(conf));
+ corpus = c;
+
+ eigenVerifier = new SimpleEigenVerifier();
+
+ Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+ saveCleanEigens(conf, prunedEigenMeta);
+ }
+}