You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by jm...@apache.org on 2010/02/20 16:45:48 UTC
svn commit: r912134 - in /lucene/mahout/trunk: core/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/math/hadoop/
core/src/main/java/org/apache/mahout/math/hadoop/decomposer/
core/src/test/java/org/apache/maho...
Author: jmannix
Date: Sat Feb 20 15:45:47 2010
New Revision: 912134
URL: http://svn.apache.org/viewvc?rev=912134&view=rev
Log:
MAHOUT-180
Adds DistributedRowMatrix (not fully implemented - only has timesSquared(Vector) currently, others to follow), DistributedLanczosSolver and EigenVerificationJob.
Wiki to follow on usage.
Some refactorings in the math.decomposer (no distributed/hadoop) versions as well, to tie together the various solvers under one kind of interface with some
shared code. There may be more changes.
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java (contents, props changed)
- copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java (contents, props changed)
- copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java (contents, props changed)
- copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java (contents, props changed)
- copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
Removed:
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java
Modified:
lucene/mahout/trunk/core/pom.xml
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java
lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
Modified: lucene/mahout/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/pom.xml?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/pom.xml (original)
+++ lucene/mahout/trunk/core/pom.xml Sat Feb 20 15:45:47 2010
@@ -258,6 +258,16 @@
<artifactId>easymockclassextension</artifactId>
</dependency>
+
+
+ <dependency>
+ <groupId>org.apache.mahout</groupId>
+ <artifactId>mahout-math</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<repositories>
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java Sat Feb 20 15:45:47 2010
@@ -90,7 +90,7 @@
Option tempDirOpt = buildOption("tempDir", "t", "Intermediate output directory", "temp");
Option outputOpt = DefaultOptionCreator.outputOption().create();
Option helpOpt = DefaultOptionCreator.helpOption();
- Option jarFileOpt = buildOption("jarFile", "m", "Implementation jar");
+ Option jarFileOpt = buildOption("jarFile", "m", "Implementation jar", false, null);
GroupBuilder gBuilder = new GroupBuilder().withName("Options").withOption(inputOpt)
.withOption(tempDirOpt).withOption(outputOpt).withOption(helpOpt).withOption(jarFileOpt);
@@ -150,9 +150,11 @@
Path inputPathPath = new Path(inputPath).makeQualified(fs);
Path outputPathPath = new Path(outputPath).makeQualified(fs);
-
- jobConf.set("mapred.jar", jarFile);
- jobConf.setJar(jarFile);
+
+ if(jarFile != null) {
+ jobConf.set("mapred.jar", jarFile);
+ jobConf.setJar(jarFile);
+ }
jobConf.setClass("mapred.input.format.class", inputFormat, InputFormat.class);
jobConf.set("mapred.input.dir", StringUtils.escapeString(inputPathPath.toString()));
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,171 @@
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+
+/**
+ * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a
+ * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on
+ * Hadoop. The usage is as follows: <p>
+ * <p>
+ * <pre>
+ * // the path must already contain an already created SequenceFile!
+ * DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000);
+ * m.configure(new JobConf());
+ * // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this
+ * // matrix. If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension
+ * // of the matrix.
+ * Vector v = new DenseVector(250000);
+ * // now the following operation will be done via a M/R pass via Hadoop.
+ * Vector w = m.timesSquared(v);
+ * </pre>
+ *
+ */
+public class DistributedRowMatrix implements VectorIterable, JobConfigurable {
+ private String inputPathString;
+ private String outputTmpPathString;
+ private JobConf conf;
+ private Path rowPath;
+ private Path outputTmpBasePath;
+ private int numRows;
+ private int numCols;
+
+ @Override
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ try {
+ rowPath= FileSystem.get(conf).makeQualified(new Path(inputPathString));
+ outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outputTmpPathString));
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public DistributedRowMatrix(String inputPathString,
+ String outputTmpPathString,
+ int numRows,
+ int numCols) {
+ this.inputPathString = inputPathString;
+ this.outputTmpPathString = outputTmpPathString;
+ this.numRows = numRows;
+ this.numCols = numCols;
+ }
+
+ public Path getRowPath() {
+ return rowPath;
+ }
+
+ public Path getOutputTempPath() {
+ return outputTmpBasePath;
+ }
+
+ @Override
+ public Iterator<MatrixSlice> iterateAll() {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, rowPath, conf);
+ return new DistributedMatrixIterator(reader);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public int numSlices() {
+ return numRows();
+ }
+
+ @Override
+ public int numRows() {
+ return numRows;
+ }
+
+ @Override
+ public int numCols() {
+ return numCols;
+ }
+
+ @Override
+ public Vector times(Vector v) {
+ // TODO: times(Vector) is easy, works pretty much like timesSquared.
+ throw new UnsupportedOperationException("DistributedRowMatrix methods other than timesSquared not supported yet");
+ }
+
+ @Override
+ public Vector timesSquared(Vector v) {
+ try {
+ JobConf conf = TimesSquaredJob.createTimesSquaredJobConf(v,
+ rowPath,
+ new Path(outputTmpBasePath,
+ new Path(Long.toString(System.nanoTime()))));
+ JobClient.runJob(conf);
+ return TimesSquaredJob.retrieveTimesSquaredOutputVector(conf);
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public Iterator<MatrixSlice> iterator() {
+ return iterateAll();
+ }
+
+ public static class DistributedMatrixIterator implements Iterator<MatrixSlice> {
+ private SequenceFile.Reader reader;
+ private boolean hasBuffered = false;
+ private boolean hasNext = false;
+ private IntWritable i = new IntWritable();
+ private VectorWritable v = new VectorWritable();
+
+ public DistributedMatrixIterator(SequenceFile.Reader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if(!hasBuffered) {
+ hasNext = reader.next(i, v);
+ hasBuffered = true;
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ } finally {
+ if(!hasNext) {
+ try { reader.close(); } catch (IOException ioe) {}
+ }
+ }
+ return hasNext;
+
+ }
+
+ @Override
+ public MatrixSlice next() {
+ if(!hasBuffered && !hasNext()) {
+ return null;
+ }
+ hasBuffered = false;
+ MatrixSlice slice = new MatrixSlice(v.get(), i.get());
+ return slice;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove from DistributedMatrixIterator");
+ }
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,204 @@
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+
+import static org.apache.mahout.math.function.Functions.plusMult;
+
+
+public class TimesSquaredJob {
+ private static final Logger log = LoggerFactory.getLogger(TimesSquaredJob.class);
+
+ public static final String INPUT_VECTOR = "timesSquared.inputVector";
+ public static final String IS_SPARSE_OUTPUT = "timesSquared.outputVector.sparse";
+ public static final String OUTPUT_VECTOR_DIMENSION = "timesSquared.output.dimension";
+
+ public static final String OUTPUT_VECTOR_FILENAME = "timesSquaredOutputVector";
+
+ public static JobConf createTimesSquaredJobConf(Vector v,
+ Path matrixInputPath,
+ Path outputVectorPath) throws IOException {
+ return createTimesSquaredJobConf(v,
+ matrixInputPath,
+ outputVectorPath,
+ TimesSquaredMapper.class,
+ VectorSummingReducer.class);
+ }
+
+ public static JobConf createTimesSquaredJobConf(Vector v,
+ Path matrixInputPath,
+ Path outputVectorPathBase,
+ Class<? extends TimesSquaredMapper> mapClass,
+ Class<? extends VectorSummingReducer> redClass) throws IOException {
+ JobConf conf = new JobConf(TimesSquaredJob.class);
+ conf.setJobName("TimesSquaredJob: " + matrixInputPath + " timesSquared(" + v.getName() + ")");
+ FileSystem fs = FileSystem.get(conf);
+ matrixInputPath = fs.makeQualified(matrixInputPath);
+ outputVectorPathBase = fs.makeQualified(outputVectorPathBase);
+
+ long now = System.nanoTime();
+ Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + "/" + now);
+ SequenceFile.Writer inputVectorPathWriter = new SequenceFile.Writer(fs,
+ conf, inputVectorPath, NullWritable.class, VectorWritable.class);
+ VectorWritable inputVW = new VectorWritable(v);
+ inputVectorPathWriter.append(NullWritable.get(), inputVW);
+ inputVectorPathWriter.close();
+ URI ivpURI = inputVectorPath.toUri();
+ DistributedCache.setCacheFiles(new URI[] {ivpURI}, conf);
+ fs.deleteOnExit(inputVectorPath);
+
+ conf.set(INPUT_VECTOR, ivpURI.toString());
+ conf.setBoolean(IS_SPARSE_OUTPUT, !(v instanceof DenseVector));
+ conf.setInt(OUTPUT_VECTOR_DIMENSION, v.size());
+ FileInputFormat.addInputPath(conf, matrixInputPath);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ FileOutputFormat.setOutputPath(conf, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME));
+ conf.setMapperClass(mapClass);
+ conf.setMapOutputKeyClass(NullWritable.class);
+ conf.setMapOutputValueClass(VectorWritable.class);
+ conf.setReducerClass(redClass);
+ conf.setCombinerClass(redClass);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(NullWritable.class);
+ conf.setOutputValueClass(VectorWritable.class);
+ return conf;
+ }
+
+ public static Vector retrieveTimesSquaredOutputVector(JobConf conf) {
+ try {
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem fs = FileSystem.get(conf);
+ Path outputFile = new Path(outputPath, "part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, outputFile, conf);
+ NullWritable n = NullWritable.get();
+ VectorWritable v = new VectorWritable();
+ reader.next(n,v);
+ Vector vector = v.get();
+ reader.close();
+ fs.deleteOnExit(outputFile);
+ return vector;
+ } catch (IOException ioe) {
+ log.error("Unable to retrieve vector!");
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public static class TimesSquaredMapper extends MapReduceBase
+ implements Mapper<WritableComparable,VectorWritable, NullWritable,VectorWritable> {
+
+ protected Vector inputVector;
+ protected Vector outputVector;
+ protected OutputCollector<NullWritable,VectorWritable> out;
+
+ @Override
+ public void configure(JobConf conf) {
+ try {
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ if (localFiles == null || localFiles.length < 1) {
+ throw new IllegalArgumentException(
+ "missing paths from the DistributedCache");
+ }
+ Path inputVectorPath = new Path(localFiles[0].getPath());
+ FileSystem fs = inputVectorPath.getFileSystem(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ inputVectorPath,
+ conf);
+ VectorWritable val = new VectorWritable();
+ NullWritable nw = NullWritable.get();
+ reader.next(nw, val);
+ reader.close();
+ inputVector = val.get();
+ if(!(inputVector instanceof SequentialAccessSparseVector || inputVector instanceof DenseVector)) {
+ inputVector = new SequentialAccessSparseVector(inputVector);
+ }
+ outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+ ? new RandomAccessSparseVector(inputVector.size(), 10)
+ : new DenseVector(inputVector.size());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public void map(WritableComparable rowNum,
+ VectorWritable v,
+ OutputCollector<NullWritable,VectorWritable> out,
+ Reporter rep) throws IOException {
+ this.out = out;
+ double d = scale(v);
+ if(d == 0) {
+ return;
+ } else if(d == 1) {
+ outputVector.assign(v.get(), Functions.plus);
+ } else {
+ outputVector.assign(v.get(), plusMult(d));
+ }
+ }
+
+ protected double scale(VectorWritable v) {
+ return v.get().dot(inputVector);
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.collect(NullWritable.get(), new VectorWritable(outputVector));
+ }
+
+ }
+
+ public static class VectorSummingReducer extends MapReduceBase
+ implements Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+
+ protected Vector outputVector;
+
+ @Override
+ public void configure(JobConf conf) {
+ int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+ outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+ ? new RandomAccessSparseVector(outputDimension, 10)
+ : new DenseVector(outputDimension);
+ }
+
+ @Override
+ public void reduce(NullWritable n,
+ Iterator<VectorWritable> vectors,
+ OutputCollector<NullWritable,VectorWritable> out,
+ Reporter reporter) throws IOException {
+ while(vectors.hasNext()) {
+ VectorWritable v = vectors.next();
+ if(v != null) {
+ v.get().addTo(outputVector);
+ }
+ }
+ out.collect(NullWritable.get(), new VectorWritable(outputVector));
+ }
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,155 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.commons.cli2.Option;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.lanczos.LanczosSolver;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+
+ private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class);
+
+ private Configuration conf;
+
+ private Map<String,String> parsedArgs;
+
+ /**
+ * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be
+ * uniform over all input dimensions, L_2 normalized.
+ * @param corpus
+ * @return
+ */
+ protected Vector getInitialVector(VectorIterable corpus) {
+ Vector initialVector = new DenseVector(corpus.numCols());
+ initialVector.assign(1/Math.sqrt(corpus.numCols()));
+ return initialVector;
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ String inputPathString = parsedArgs.get("--input");
+ String outputTmpPathString = parsedArgs.get("--tempDir");
+ int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
+ int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
+ int desiredRank = Integer.parseInt(parsedArgs.get("--rank"));
+ Matrix eigenVectors = new DenseMatrix(desiredRank, numCols);
+ List<Double> eigenValues = new ArrayList<Double>();
+ String outputEigenVectorPath = parsedArgs.get("--output");
+
+ DistributedRowMatrix matrix = new DistributedRowMatrix(inputPathString,
+ outputTmpPathString,
+ numRows,
+ numCols);
+ matrix.configure(new JobConf(getConf()));
+ solve(matrix, desiredRank, eigenVectors, eigenValues);
+
+ // TODO ack!
+ EigenVerificationJob evj = new EigenVerificationJob();
+ evj.setEigensToVerify(eigenVectors);
+ evj.setConf(getConf());
+ evj.run(new String[] {"--input", parsedArgs.get("--input"), "--output", parsedArgs.get("--output")});
+
+ serializeOutput(eigenVectors, eigenValues, outputEigenVectorPath);
+ return 0;
+ }
+
+ /**
+ * TODO: this should be refactored to allow both LanczosSolver impls to properly serialize output in a generic way.
+ * @param eigenVectors The eigenvectors to be serialized
+ * @param eigenValues The eigenvalues to be serialized
+ * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to.
+ * @throws IOException
+ */
+ public void serializeOutput(Matrix eigenVectors, List<Double> eigenValues, String outputPath) throws IOException {
+ log.info("Persisting " + eigenVectors.numRows() + " eigenVectors and eigenValues to: " + outputPath);
+ Path path = new Path(outputPath);
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+ VectorWritable vw = new VectorWritable();
+ IntWritable iw = new IntWritable();
+ for(int i=0; i<eigenVectors.numRows() - 1; i++) {
+ Vector v = eigenVectors.getRow(i);
+ v.setName("eigenVector" + i + ", eigenvalue = " + eigenValues.get(i));
+ vw.set(v);
+ iw.set(i);
+ seqWriter.append(iw, vw);
+ }
+ seqWriter.close();
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public DistributedLanczosSolverJob job() {
+ return new DistributedLanczosSolverJob();
+ }
+
+ /**
+ * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still
+ * sublcass LanczosSolver.
+ */
+ public class DistributedLanczosSolverJob extends AbstractJob {
+ @Override
+ public void setConf(Configuration conf) {
+ DistributedLanczosSolver.this.setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return DistributedLanczosSolver.this.getConf();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ Option numRowsOpt = buildOption("numRows",
+ "nr",
+ "Number of rows of the input matrix",
+ "0");
+ Option numColsOpt = buildOption("numCols",
+ "nc",
+ "Number of columns of the input matrix");
+ Option desiredRankOpt = buildOption("rank",
+ "r",
+ "Desired decomposition rank (note: only roughly 1/4 to 1/3 "
+ + "of these will have the top portion of the spectrum)");
+
+ DistributedLanczosSolver.this.parsedArgs = parseArguments(args, numRowsOpt, numColsOpt, desiredRankOpt);
+ return DistributedLanczosSolver.this.run(args);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new DistributedLanczosSolver().job(), args);
+ }
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,36 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.mahout.math.DenseVector;
+
+/**
+ * TODO this is a horrible hack. Make a proper writable subclass also.
+ */
+public class EigenVector extends DenseVector {
+
+ public EigenVector(DenseVector v, double eigenValue, double cosAngleError, int order) {
+ super(v, false);
+ setName("e|" + order +"| = |"+eigenValue+"|, err = "+cosAngleError);
+ }
+
+ public double getEigenValue() {
+ return parseMetaData()[1];
+ }
+
+ public double getCosAngleError() {
+ return parseMetaData()[2];
+ }
+
+ public int getIndex() {
+ return (int)parseMetaData()[0];
+ }
+
+ protected double[] parseMetaData() {
+ double[] m = new double[3];
+ String[] s = getName().split(" = ");
+ m[0] = Double.parseDouble(s[0].split("|")[1]);
+ m[1] = Double.parseDouble(s[1].split("|")[1]);
+ m[2] = Double.parseDouble(s[2].substring(1));
+ return m;
+ }
+
+}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,236 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.OrthonormalityVerifier;
+import org.apache.mahout.math.SparseRowMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SimpleEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p>Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness,
+ * in terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error
+ * w.r.t. eigenvector-ness is the cosine of the angle between e and e':</p>
+ * <pre>
+ * error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
+ * </pre>
+ * <p>A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products
+ * between eigenvectors, and checks that this is close to the identity matrix.
+ * </p>
+ * <p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which
+ * specifies the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies
+ * the maximum error (as defined above) to be tolerated in an eigenvector.</p>
+ * <p>
+ * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
+ * </p>
+ */
+public class EigenVerificationJob extends AbstractJob implements Tool {
+ private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class);
+
+ private SingularVectorVerifier eigenVerifier;
+ private OrthonormalityVerifier orthoVerifier;
+ private VectorIterable eigensToVerify;
+ private VectorIterable corpus;
+ private double maxError;
+ private double minEigenValue;
+ private boolean loadEigensInMemory;
+
+ public void setEigensToVerify(VectorIterable eigens) {
+ eigensToVerify = eigens;
+ }
+
+ private String tmpOut;
+ private String outPath;
+
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Map<String,String> argMap = handleArgs(args);
+ if(argMap == null) {
+ return -1;
+ } else if (argMap.isEmpty()) {
+ return 0;
+ }
+ outPath = argMap.get("--output");
+ tmpOut = outPath + "/tmp";
+
+ if(argMap.get("--eigenInput") != null && eigensToVerify == null) {
+ prepareEigens(argMap.get("--eigenInput"), argMap.get("--inMemory") != null);
+ }
+
+ maxError = Double.parseDouble(argMap.get("--maxError"));
+ minEigenValue = Double.parseDouble(argMap.get("--minEigenvalue"));
+
+ DistributedRowMatrix c = new DistributedRowMatrix(argMap.get("--corpusInput"), tmpOut, 1, 1);
+ c.configure(new JobConf(getConf()));
+ corpus = c;
+
+ // set up eigenverifier and orthoverifier TODO: allow multithreaded execution
+
+ eigenVerifier = new SimpleEigenVerifier();
+ orthoVerifier = new OrthonormalityVerifier();
+
+ VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
+
+ Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+
+ saveCleanEigens(prunedEigenMeta);
+
+ return 0;
+ }
+
+ public Map<String,String> handleArgs(String[] args) {
+ Option eigenInputOpt = buildOption("eigenInput", "ei",
+ "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
+ Option corpusInputOpt = buildOption("corpusInput", "ci",
+ "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
+ Option outOpt = DefaultOptionCreator.outputOption().create();
+ Option helpOpt = DefaultOptionCreator.helpOption();
+ Option inMemOpt = buildOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+ Option errorOpt = buildOption("maxError", "err", "Maximum acceptable error", "0.05");
+ Option minEigenValOpt = buildOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+
+ GroupBuilder gBuilder = new GroupBuilder().withName("Options")
+ .withOption(eigenInputOpt)
+ .withOption(corpusInputOpt)
+ .withOption(helpOpt)
+ .withOption(outOpt)
+ .withOption(inMemOpt)
+ .withOption(errorOpt)
+ .withOption(minEigenValOpt);
+ Group group = gBuilder.create();
+
+ Map<String,String> argMap = new HashMap<String,String>();
+
+ CommandLine cmdLine;
+ try {
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ cmdLine = parser.parse(args);
+ } catch (OptionException e) {
+ log.error(e.getMessage());
+ CommandLineUtil.printHelp(group);
+ return null;
+ }
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return argMap;
+ }
+ maybePut(argMap, cmdLine, eigenInputOpt, corpusInputOpt, helpOpt, outOpt, inMemOpt, errorOpt, minEigenValOpt);
+ return argMap;
+ }
+
+ public VectorIterable computePairwiseInnerProducts() {
+ return orthoVerifier.pairwiseInnerProducts(eigensToVerify);
+ }
+
+ public void saveCleanEigens(List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta) throws IOException {
+ Path path = new Path(outPath, "largestCleanEigens");
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+ VectorWritable vw = new VectorWritable();
+ IntWritable iw = new IntWritable();
+ for(Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
+ MatrixSlice s = pruneSlice.getKey();
+ EigenStatus meta = pruneSlice.getValue();
+ EigenVector ev = new EigenVector((DenseVector)s.vector(),
+ meta.getEigenValue(),
+ Math.abs(1-meta.getCosAngle()),
+ s.index());
+ log.info("appending " + ev.getName() + " to " + path.toString());
+ vw.set(ev);
+ iw.set(s.index());
+ seqWriter.append(iw, vw);
+ }
+ seqWriter.close();
+ }
+
+ public List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = new ArrayList<Map.Entry<MatrixSlice,EigenStatus>>();
+
+ for(Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
+ if(Math.abs(1-entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
+ prunedEigenMeta.add(entry);
+ }
+ }
+
+ Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
+ @Override
+ public int compare(Map.Entry<MatrixSlice, EigenStatus> e1, Map.Entry<MatrixSlice, EigenStatus> e2) {
+ return e1.getKey().index() - e2.getKey().index();
+ }
+ });
+ return prunedEigenMeta;
+ }
+
+ public Map<MatrixSlice,EigenStatus> verifyEigens() {
+ Map<MatrixSlice, EigenStatus> eigenMetaData = new HashMap<MatrixSlice, EigenStatus>();
+
+ for(MatrixSlice slice : eigensToVerify) {
+ EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
+ eigenMetaData.put(slice, status);
+ }
+ return eigenMetaData;
+ }
+
+ private void prepareEigens(String eigenInput, boolean inMemory) {
+ DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1);
+ eigens.configure(new JobConf(getConf()));
+ if(inMemory) {
+ List<Vector> eigenVectors = new ArrayList<Vector>();
+ for(MatrixSlice slice : eigens) {
+ eigenVectors.add(slice.vector());
+ }
+ eigensToVerify = new SparseRowMatrix(new int[] {eigenVectors.size(), eigenVectors.get(0).size()},
+ eigenVectors.toArray(new Vector[eigenVectors.size()]),
+ true,
+ true);
+
+ } else {
+ eigensToVerify = eigens;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int retVal = ToolRunner.run(new EigenVerificationJob(), args);
+ System.exit(retVal);
+ }
+}
Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java Sat Feb 20 15:45:47 2010
@@ -32,7 +32,7 @@
private ClusteringTestUtils() {
}
- public static void writePointsToFile(List<VectorWritable> points, String fileName, FileSystem fs, Configuration conf)
+ public static void writePointsToFile(Iterable<VectorWritable> points, String fileName, FileSystem fs, Configuration conf)
throws IOException {
Path path = new Path(fileName);
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class);
Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Sat Feb 20 15:45:47 2010
@@ -117,7 +117,7 @@
}
- private static void rmr(String path) {
+ public static void rmr(String path) {
File f = new File(path);
if (f.exists()) {
if (f.isDirectory()) {
Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,86 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.TestCanopyCreation;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.SolverTest;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class TestDistributedLanczosSolver extends SolverTest {
+
+ public TestDistributedLanczosSolver(String name) {
+ super(name);
+ }
+
+ public void testDistributedLanczosSolver() throws Exception {
+ File testData = new File("testdata");
+ if (!testData.exists()) {
+ testData.mkdir();
+ }
+ DistributedRowMatrix corpus = randomDistributedMatrix(1000, 900, 800, 100, 10.0, "testdata");
+ corpus.configure(new JobConf());
+ DistributedLanczosSolver solver = new DistributedLanczosSolver();
+ int desiredRank = 30;
+ Matrix eigenVectors = new DenseMatrix(desiredRank, 800);
+ List<Double> eigenValues = new ArrayList<Double>();
+ solver.solve(corpus, desiredRank, eigenVectors, eigenValues);
+ assertOrthonormal(eigenVectors);
+ assertEigen(eigenVectors, corpus, eigenVectors.numRows() / 2, 0.01);
+ }
+
+ public void tearDown() throws Exception {
+ TestCanopyCreation.rmr("testData");
+ }
+
+
+ public static DistributedRowMatrix randomDistributedMatrix(int numRows,
+ int nonNullRows,
+ int numCols,
+ int entriesPerRow,
+ double entryMean,
+ String baseTmpDir) throws IOException {
+ final Matrix m = randomSequentialAccessSparseMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean);
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ ClusteringTestUtils.writePointsToFile(new Iterable<VectorWritable>() {
+ @Override
+ public Iterator<VectorWritable> iterator() {
+ final Iterator<MatrixSlice> it = m.iterator();
+ final VectorWritable v = new VectorWritable();
+ return new Iterator<VectorWritable>() {
+ @Override
+ public boolean hasNext() { return it.hasNext(); }
+ @Override
+ public VectorWritable next() {
+ MatrixSlice slice = it.next();
+ v.set(slice.vector());
+ return v;
+ }
+ @Override
+ public void remove() { it.remove(); }
+ };
+ }
+ }, baseTmpDir + "/distMatrix", fs, conf);
+
+ DistributedRowMatrix distMatrix = new DistributedRowMatrix(baseTmpDir + "/distMatrix",
+ baseTmpDir + "/tmpOut",
+ m.numRows(),
+ m.numCols());
+
+ return distMatrix;
+ }
+}
Added: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java (added)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,25 @@
+package org.apache.mahout.math;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OrthonormalityVerifier {
+
+ public VectorIterable pairwiseInnerProducts(VectorIterable basis) {
+ DenseMatrix out = null;
+ for(MatrixSlice slice1 : basis) {
+ List<Double> dots = new ArrayList<Double>();
+ for(MatrixSlice slice2 : basis) {
+ dots.add(slice1.vector().dot(slice2.vector()));
+ }
+ if(out == null) {
+ out = new DenseMatrix(dots.size(), dots.size());
+ }
+ for(int i=0; i<dots.size(); i++) {
+ out.set(slice1.index(), i, dots.get(i));
+ }
+ }
+ return out;
+ }
+
+}
Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java Sat Feb 20 15:45:47 2010
@@ -15,35 +15,35 @@
* limitations under the License.
*/
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
-public class MultiThreadedEigenVerifier extends SimpleEigenVerifier {
+public class AsyncEigenVerifier extends SimpleEigenVerifier {
private final Executor threadPool;
private EigenStatus status = null;
private boolean finished = false;
private boolean started = false;
- public MultiThreadedEigenVerifier() {
+ public AsyncEigenVerifier() {
threadPool = Executors.newFixedThreadPool(1);
status = new EigenStatus(-1, 0);
}
@Override
- public EigenStatus verify(Matrix eigenMatrix, Vector vector) {
+ public EigenStatus verify(VectorIterable corpus, Vector vector) {
synchronized (status) {
if (!finished && !started) // not yet started or finished, so start!
{
status = new EigenStatus(-1, 0);
Vector vectorCopy = vector.clone();
- threadPool.execute(new VerifierRunnable(eigenMatrix, vectorCopy));
+ threadPool.execute(new VerifierRunnable(corpus, vectorCopy));
started = true;
}
if (finished) finished = false;
@@ -51,23 +51,23 @@
}
}
- protected EigenStatus innerVerify(Matrix eigenMatrix, Vector vector) {
- return super.verify(eigenMatrix, vector);
+ protected EigenStatus innerVerify(VectorIterable corpus, Vector vector) {
+ return super.verify(corpus, vector);
}
private class VerifierRunnable implements Runnable {
- private final Matrix eigenMatrix;
+ private final VectorIterable corpus;
private final Vector vector;
- protected VerifierRunnable(Matrix eigenMatrix, Vector vector) {
- this.eigenMatrix = eigenMatrix;
+ protected VerifierRunnable(VectorIterable corpus, Vector vector) {
+ this.corpus = corpus;
this.vector = vector;
}
public void run() {
- EigenStatus status = innerVerify(eigenMatrix, vector);
- synchronized (MultiThreadedEigenVerifier.this.status) {
- MultiThreadedEigenVerifier.this.status = status;
+ EigenStatus status = innerVerify(corpus, vector);
+ synchronized (AsyncEigenVerifier.this.status) {
+ AsyncEigenVerifier.this.status = status;
finished = true;
started = false;
}
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
svn:keywords = "Date Rev Author URL Id"
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java Sat Feb 20 15:45:47 2010
@@ -15,15 +15,21 @@
* limitations under the License.
*/
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
public class EigenStatus {
private final double eigenValue;
private final double cosAngle;
+ private Boolean inProgress;
public EigenStatus(double eigenValue, double cosAngle) {
+ this(eigenValue, cosAngle, true);
+ }
+
+ public EigenStatus(double eigenValue, double cosAngle, boolean inProgress) {
this.eigenValue = eigenValue;
this.cosAngle = cosAngle;
+ this.inProgress = inProgress;
}
public double getCosAngle() {
@@ -33,4 +39,16 @@
public double getEigenValue() {
return eigenValue;
}
+
+ public boolean inProgress() {
+ synchronized (inProgress) {
+ return inProgress;
+ }
+ }
+
+ void setInProgress(boolean status) {
+ synchronized (inProgress) {
+ inProgress = status;
+ }
+ }
}
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
svn:keywords = "Date Rev Author URL Id"
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java Sat Feb 20 15:45:47 2010
@@ -14,20 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
public class SimpleEigenVerifier implements SingularVectorVerifier {
- public EigenStatus verify(Matrix eigenMatrix, Vector vector) {
- Vector resultantVector = eigenMatrix.timesSquared(vector);
+ public EigenStatus verify(VectorIterable corpus, Vector vector) {
+ Vector resultantVector = corpus.timesSquared(vector);
double newNorm = resultantVector.norm(2);
double oldNorm = vector.norm(2);
double eigenValue = (newNorm > 0 && oldNorm > 0) ? newNorm / oldNorm : 1;
double cosAngle = (newNorm > 0 && oldNorm > 0) ? resultantVector.dot(vector) / (newNorm * oldNorm) : 0;
- return new EigenStatus(eigenValue, cosAngle);
+ return new EigenStatus(eigenValue, cosAngle, false);
}
}
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
svn:keywords = "Date Rev Author URL Id"
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java Sat Feb 20 15:45:47 2010
@@ -15,11 +15,13 @@
* limitations under the License.
*/
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.decomposer.EigenStatus;
public interface SingularVectorVerifier {
- EigenStatus verify(Matrix eigenMatrix, Vector vector);
+ EigenStatus verify(VectorIterable eigenMatrix, Vector vector);
}
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
svn:keywords = "Date Rev Author URL Id"
Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java Sat Feb 20 15:45:47 2010
@@ -27,6 +27,9 @@
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.decomposer.AsyncEigenVerifier;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
import org.apache.mahout.math.function.TimesFunction;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.function.PlusMult;
@@ -58,7 +61,7 @@
* @param updater {@link EigenUpdater} used to do the actual work of iteratively updating the current "best guess"
* singular vector one data-point presentation at a time.
* @param verifier {@link SingularVectorVerifier } an object which perpetually tries to check how close to
- * convergence the current singular vector is (typically is a {@link MultiThreadedEigenVerifier } which does this
+ * convergence the current singular vector is (typically is a {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } which does this
* in the background in another thread, while the main thread continues to converge)
* @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
* angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -83,8 +86,8 @@
*
* @param updater {@link EigenUpdater} used to do the actual work of iteratively updating the current "best guess"
* singular vector one data-point presentation at a time.
- * @param verifier {@link SingularVectorVerifier } an object which perpetually tries to check how close to
- * convergence the current singular vector is (typically is a {@link MultiThreadedEigenVerifier } which does this
+ * @param verifier {@link org.apache.mahout.math.decomposer.SingularVectorVerifier } an object which perpetually tries to check how close to
+ * convergence the current singular vector is (typically is a {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } which does this
* in the background in another thread, while the main thread continues to converge)
* @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
* angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -102,7 +105,7 @@
/**
* <b>This is the recommended constructor to use if you're not sure</b>
* Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
- * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread.
+ * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread.
*
* @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
* angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -112,14 +115,14 @@
*/
public HebbianSolver(double convergenceTarget, int maxPassesPerEigen) {
this(new HebbianUpdater(),
- new MultiThreadedEigenVerifier(),
+ new AsyncEigenVerifier(),
convergenceTarget,
maxPassesPerEigen);
}
/**
* Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
- * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread, with
+ * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread, with
* maxPassesPerEigen set to Integer.MAX_VALUE. <b>Not recommended</b> unless only looking for the first few (5, maybe 10?) singular
* vectors, as small errors which compound early on quickly put a minimum error on subsequent vectors.
*
@@ -133,7 +136,7 @@
/**
* Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
- * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread, with
+ * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread, with
* convergenceTarget set to 0, which means that the solver will not really care about convergence as a loop-exiting
* criterion (but will be checking for convergence anyways, so it will be logged and singular values will be
* saved).
@@ -276,10 +279,7 @@
* Step 3: verify how eigen-like the prospective eigen is. This is potentially asynchronous.
*/
EigenStatus status = verify(corpus, currentPseudoEigen);
- /**
- * TODO: Having the cosAngle() be zero is not a good signal for an unfinished verification.
- */
- if (status.getCosAngle() == 0) {
+ if (status.inProgress()) {
log.info("Verifier not finished, making another pass...");
} else {
log.info("Has 1 - cosAngle: {}, convergence target is: {}", (1 - status.getCosAngle()), convergenceTarget);
@@ -310,7 +310,7 @@
int numThreads = Integer.parseInt(props.getProperty("solver.verifier.numThreads"));
HebbianUpdater updater = new HebbianUpdater();
- SingularVectorVerifier verifier = new MultiThreadedEigenVerifier();
+ SingularVectorVerifier verifier = new AsyncEigenVerifier();
HebbianSolver solver = new HebbianSolver(updater,
verifier,
convergence,
Modified: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java Sat Feb 20 15:45:47 2010
@@ -23,6 +23,7 @@
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.decomposer.EigenStatus;
public class TrainingState {
Modified: lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java (original)
+++ lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java Sat Feb 20 15:45:47 2010
@@ -20,6 +20,8 @@
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.decomposer.AsyncEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
import org.apache.mahout.math.decomposer.SolverTest;
/**
@@ -51,7 +53,7 @@
int desiredRank,
TrainingState state) throws Exception {
HebbianUpdater updater = new HebbianUpdater();
- SingularVectorVerifier verifier = new MultiThreadedEigenVerifier();
+ SingularVectorVerifier verifier = new AsyncEigenVerifier();
HebbianSolver solver = new HebbianSolver(updater,
verifier,
convergence,
Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,66 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+public class TextParagraphSplittingJob extends AbstractJob {
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ Map<String,String> args = parseArguments(strings);
+ JobConf conf = prepareJobConf(args.get("--input"),
+ args.get("--output"),
+ args.get("--jarFile"),
+ SequenceFileInputFormat.class,
+ SplitMap.class,
+ Text.class,
+ Text.class,
+ Reducer.class,
+ Text.class,
+ Text.class,
+ SequenceFileOutputFormat.class);
+ conf.setNumReduceTasks(0);
+
+ JobClient.runJob(conf).waitForCompletion();
+ return 1;
+ }
+
+ public static class SplitMap extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
+
+ @Override
+ public void map(Text key,
+ Text text,
+ OutputCollector<Text, Text> out,
+ Reporter reporter) throws IOException {
+ Text outText = new Text();
+ int loc = 0;
+ while(loc >= 0 && loc < text.getLength()) {
+ int nextLoc = text.find("\n\n", loc+1);
+ if(nextLoc > 0) {
+ outText.set(text.getBytes(), loc, (nextLoc - loc));
+ out.collect(key, outText);
+ }
+ loc = nextLoc;
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new TextParagraphSplittingJob(), args);
+ }
+}
Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java Sat Feb 20 15:45:47 2010
@@ -102,9 +102,13 @@
if (sequentialAccess) {
vector = new SequentialAccessSparseVector(vector);
}
- vectorWritable.set(vector);
- output.collect(key, vectorWritable);
-
+ // if the vector has no nonZero entries (nothing in the dictionary), let's not waste space sending it to disk.
+ if(vector.getNumNondefaultElements() > 0) {
+ vectorWritable.set(vector);
+ output.collect(key, vectorWritable);
+ } else {
+ reporter.incrCounter("TFParticalVectorReducer", "emptyVectorCount", 1);
+ }
}
@Override