You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by jm...@apache.org on 2010/02/20 16:45:48 UTC

svn commit: r912134 - in /lucene/mahout/trunk: core/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/ core/src/main/java/org/apache/mahout/math/hadoop/ core/src/main/java/org/apache/mahout/math/hadoop/decomposer/ core/src/test/java/org/apache/maho...

Author: jmannix
Date: Sat Feb 20 15:45:47 2010
New Revision: 912134

URL: http://svn.apache.org/viewvc?rev=912134&view=rev
Log:
MAHOUT-180

Adds DistributedRowMatrix (not fully implemented - only has timesSquared(Vector) currently, others to follow), DistributedLanczosSolver and EigenVerificationJob.

Wiki to follow on usage.

Some refactorings in the math.decomposer (no distributed/hadoop) versions as well, to tie together the various solvers under one kind of interface with some
shared code.  There may be more changes.

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java   (contents, props changed)
      - copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java   (contents, props changed)
      - copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java   (contents, props changed)
      - copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java   (contents, props changed)
      - copied, changed from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
Removed:
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java
Modified:
    lucene/mahout/trunk/core/pom.xml
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java
    lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
    lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java

Modified: lucene/mahout/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/pom.xml?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/pom.xml (original)
+++ lucene/mahout/trunk/core/pom.xml Sat Feb 20 15:45:47 2010
@@ -258,6 +258,16 @@
       <artifactId>easymockclassextension</artifactId>
     </dependency>
 
+
+
+    <dependency>
+      <groupId>org.apache.mahout</groupId>
+      <artifactId>mahout-math</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <repositories>

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java Sat Feb 20 15:45:47 2010
@@ -90,7 +90,7 @@
     Option tempDirOpt = buildOption("tempDir", "t", "Intermediate output directory", "temp");
     Option outputOpt = DefaultOptionCreator.outputOption().create();
     Option helpOpt = DefaultOptionCreator.helpOption();
-    Option jarFileOpt = buildOption("jarFile", "m", "Implementation jar");
+    Option jarFileOpt = buildOption("jarFile", "m", "Implementation jar", false, null);
     
     GroupBuilder gBuilder = new GroupBuilder().withName("Options").withOption(inputOpt)
         .withOption(tempDirOpt).withOption(outputOpt).withOption(helpOpt).withOption(jarFileOpt);
@@ -150,9 +150,11 @@
     
     Path inputPathPath = new Path(inputPath).makeQualified(fs);
     Path outputPathPath = new Path(outputPath).makeQualified(fs);
-    
-    jobConf.set("mapred.jar", jarFile);
-    jobConf.setJar(jarFile);
+
+    if(jarFile != null) {
+      jobConf.set("mapred.jar", jarFile);
+      jobConf.setJar(jarFile);
+    }
     
     jobConf.setClass("mapred.input.format.class", inputFormat, InputFormat.class);
     jobConf.set("mapred.input.dir", StringUtils.escapeString(inputPathPath.toString()));

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,171 @@
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+
+/**
+ * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a
+ * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on
+ * Hadoop.  The usage is as follows: <p>
+ * <p>
+ * <pre>
+ *   // the path must already contain an already created SequenceFile!
+ *   DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000);
+ *   m.configure(new JobConf());
+ *   // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this
+ *   // matrix.  If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension
+ *   // of the matrix.
+ *   Vector v = new DenseVector(250000);
+ *   // now the following operation will be done via a M/R pass via Hadoop.
+ *   Vector w = m.timesSquared(v);
+ * </pre>
+ *
+ */
+public class DistributedRowMatrix implements VectorIterable, JobConfigurable {
+  private String inputPathString;
+  private String outputTmpPathString;
+  private JobConf conf;
+  private Path rowPath;
+  private Path outputTmpBasePath;
+  private int numRows;
+  private int numCols;
+
+  @Override
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    try {
+      rowPath= FileSystem.get(conf).makeQualified(new Path(inputPathString));
+      outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outputTmpPathString));
+    } catch(IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public DistributedRowMatrix(String inputPathString,
+                              String outputTmpPathString,
+                              int numRows,
+                              int numCols) {
+    this.inputPathString = inputPathString;
+    this.outputTmpPathString = outputTmpPathString;
+    this.numRows = numRows;
+    this.numCols = numCols;
+  }
+
+  public Path getRowPath() {
+    return rowPath;
+  }
+  
+  public Path getOutputTempPath() {
+    return outputTmpBasePath;
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterateAll() {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, rowPath, conf);
+      return new DistributedMatrixIterator(reader);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public int numSlices() {
+    return numRows();
+  }
+
+  @Override
+  public int numRows() {
+    return numRows;
+  }
+
+  @Override
+  public int numCols() {
+    return numCols;
+  }
+
+  @Override
+  public Vector times(Vector v) {
+    // TODO: times(Vector) is easy, works pretty much like timesSquared.
+    throw new UnsupportedOperationException("DistributedRowMatrix methods other than timesSquared not supported yet");
+  }
+
+  @Override
+  public Vector timesSquared(Vector v) {
+    try {
+      JobConf conf = TimesSquaredJob.createTimesSquaredJobConf(v,
+                                                               rowPath,
+                                                               new Path(outputTmpBasePath,
+                                                                        new Path(Long.toString(System.nanoTime()))));
+      JobClient.runJob(conf);
+      return TimesSquaredJob.retrieveTimesSquaredOutputVector(conf);
+    } catch(IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  @Override
+  public Iterator<MatrixSlice> iterator() {
+    return iterateAll();
+  }
+
+  public static class DistributedMatrixIterator implements Iterator<MatrixSlice> {
+    private SequenceFile.Reader reader;
+    private boolean hasBuffered = false;
+    private boolean hasNext = false;
+    private IntWritable i = new IntWritable();
+    private VectorWritable v = new VectorWritable();
+
+    public DistributedMatrixIterator(SequenceFile.Reader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        if(!hasBuffered) {
+          hasNext = reader.next(i, v);
+          hasBuffered = true;
+        }
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      } finally {
+        if(!hasNext) {
+          try { reader.close(); } catch (IOException ioe) {}
+        }
+      }
+      return hasNext;
+
+    }
+
+    @Override
+    public MatrixSlice next() {
+      if(!hasBuffered && !hasNext()) {
+        return null;
+      }
+      hasBuffered = false;
+      MatrixSlice slice = new MatrixSlice(v.get(), i.get());
+      return slice;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Cannot remove from DistributedMatrixIterator");
+    }
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,204 @@
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+
+import static org.apache.mahout.math.function.Functions.plusMult;
+
+
+public class TimesSquaredJob {
+  private static final Logger log = LoggerFactory.getLogger(TimesSquaredJob.class);
+
+  public static final String INPUT_VECTOR = "timesSquared.inputVector";
+  public static final String IS_SPARSE_OUTPUT = "timesSquared.outputVector.sparse";
+  public static final String OUTPUT_VECTOR_DIMENSION = "timesSquared.output.dimension";
+
+  public static final String OUTPUT_VECTOR_FILENAME = "timesSquaredOutputVector";
+
+  public static JobConf createTimesSquaredJobConf(Vector v, 
+                                                  Path matrixInputPath, 
+                                                  Path outputVectorPath) throws IOException {
+    return createTimesSquaredJobConf(v,
+                                     matrixInputPath,
+                                     outputVectorPath,
+                                     TimesSquaredMapper.class,
+                                     VectorSummingReducer.class);
+  }
+
+  public static JobConf createTimesSquaredJobConf(Vector v,
+                                                  Path matrixInputPath,
+                                                  Path outputVectorPathBase,
+                                                  Class<? extends TimesSquaredMapper> mapClass,
+                                                  Class<? extends VectorSummingReducer> redClass) throws IOException {
+    JobConf conf = new JobConf(TimesSquaredJob.class);
+    conf.setJobName("TimesSquaredJob: " + matrixInputPath + " timesSquared(" + v.getName() + ")");
+    FileSystem fs = FileSystem.get(conf);
+    matrixInputPath = fs.makeQualified(matrixInputPath);
+    outputVectorPathBase = fs.makeQualified(outputVectorPathBase);
+
+    long now = System.nanoTime();
+    Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + "/" + now);
+    SequenceFile.Writer inputVectorPathWriter = new SequenceFile.Writer(fs,
+            conf, inputVectorPath, NullWritable.class, VectorWritable.class);
+    VectorWritable inputVW = new VectorWritable(v);
+    inputVectorPathWriter.append(NullWritable.get(), inputVW);
+    inputVectorPathWriter.close();
+    URI ivpURI = inputVectorPath.toUri();
+    DistributedCache.setCacheFiles(new URI[] {ivpURI}, conf);
+    fs.deleteOnExit(inputVectorPath);
+
+    conf.set(INPUT_VECTOR, ivpURI.toString());
+    conf.setBoolean(IS_SPARSE_OUTPUT, !(v instanceof DenseVector));
+    conf.setInt(OUTPUT_VECTOR_DIMENSION, v.size());
+    FileInputFormat.addInputPath(conf, matrixInputPath);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    FileOutputFormat.setOutputPath(conf, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME));
+    conf.setMapperClass(mapClass);
+    conf.setMapOutputKeyClass(NullWritable.class);
+    conf.setMapOutputValueClass(VectorWritable.class);
+    conf.setReducerClass(redClass);
+    conf.setCombinerClass(redClass);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    conf.setOutputKeyClass(NullWritable.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    return conf;
+  }
+
+  public static Vector retrieveTimesSquaredOutputVector(JobConf conf) {
+    try {
+      Path outputPath = FileOutputFormat.getOutputPath(conf);
+      FileSystem fs = FileSystem.get(conf);
+      Path outputFile = new Path(outputPath, "part-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, outputFile, conf);
+      NullWritable n = NullWritable.get();
+      VectorWritable v = new VectorWritable();
+      reader.next(n,v);
+      Vector vector = v.get();
+      reader.close();
+      fs.deleteOnExit(outputFile);
+      return vector;
+    } catch (IOException ioe) {
+      log.error("Unable to retrieve vector!");
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public static class TimesSquaredMapper extends MapReduceBase
+      implements Mapper<WritableComparable,VectorWritable, NullWritable,VectorWritable> {
+
+    protected Vector inputVector;
+    protected Vector outputVector;
+    protected OutputCollector<NullWritable,VectorWritable> out;
+
+    @Override
+    public void configure(JobConf conf) {
+      try {
+        URI[] localFiles = DistributedCache.getCacheFiles(conf);
+        if (localFiles == null || localFiles.length < 1) {
+          throw new IllegalArgumentException(
+            "missing paths from the DistributedCache");
+        }
+        Path inputVectorPath = new Path(localFiles[0].getPath());
+        FileSystem fs = inputVectorPath.getFileSystem(conf);
+
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+          inputVectorPath,
+          conf);
+        VectorWritable val = new VectorWritable();
+        NullWritable nw = NullWritable.get();
+        reader.next(nw, val);
+        reader.close();
+        inputVector = val.get();
+        if(!(inputVector instanceof SequentialAccessSparseVector || inputVector instanceof DenseVector)) {
+          inputVector = new SequentialAccessSparseVector(inputVector);
+        }
+        outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+                     ? new RandomAccessSparseVector(inputVector.size(), 10)
+                     : new DenseVector(inputVector.size());
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+    }
+
+    @Override
+    public void map(WritableComparable rowNum,
+                    VectorWritable v,
+                    OutputCollector<NullWritable,VectorWritable> out,
+                    Reporter rep) throws IOException {
+      this.out = out;
+      double d = scale(v);
+      if(d == 0) {
+        return;
+      } else if(d == 1) {
+        outputVector.assign(v.get(), Functions.plus);
+      } else {
+        outputVector.assign(v.get(), plusMult(d));
+      }
+    }
+
+    protected double scale(VectorWritable v) {
+      return v.get().dot(inputVector);
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.collect(NullWritable.get(), new VectorWritable(outputVector));
+    }
+
+  }
+
+  public static class VectorSummingReducer extends MapReduceBase
+      implements Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+
+    protected Vector outputVector;
+
+    @Override
+    public void configure(JobConf conf) {
+      int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+      outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+                   ? new RandomAccessSparseVector(outputDimension, 10)
+                   : new DenseVector(outputDimension);
+    }
+
+    @Override
+    public void reduce(NullWritable n,
+                       Iterator<VectorWritable> vectors,
+                       OutputCollector<NullWritable,VectorWritable> out,
+                       Reporter reporter) throws IOException {
+      while(vectors.hasNext()) {
+        VectorWritable v = vectors.next();
+        if(v != null) {
+          v.get().addTo(outputVector);
+        }
+      }
+      out.collect(NullWritable.get(), new VectorWritable(outputVector));
+    }
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,155 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.commons.cli2.Option;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.lanczos.LanczosSolver;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class);
+
+  private Configuration conf;
+
+  private Map<String,String> parsedArgs;
+
+  /**
+   * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be
+   * uniform over all input dimensions, L_2 normalized.
+   * @param corpus
+   * @return
+   */
+  protected Vector getInitialVector(VectorIterable corpus) {
+    Vector initialVector = new DenseVector(corpus.numCols());
+    initialVector.assign(1/Math.sqrt(corpus.numCols()));
+    return initialVector;
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    String inputPathString = parsedArgs.get("--input");
+    String outputTmpPathString = parsedArgs.get("--tempDir");
+    int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
+    int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
+    int desiredRank = Integer.parseInt(parsedArgs.get("--rank"));
+    Matrix eigenVectors = new DenseMatrix(desiredRank, numCols);
+    List<Double> eigenValues = new ArrayList<Double>();
+    String outputEigenVectorPath =  parsedArgs.get("--output");
+    
+    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPathString,
+                                                           outputTmpPathString,
+                                                           numRows,
+                                                           numCols);
+    matrix.configure(new JobConf(getConf()));
+    solve(matrix, desiredRank, eigenVectors, eigenValues);
+
+    // TODO ack!
+    EigenVerificationJob evj = new EigenVerificationJob();
+    evj.setEigensToVerify(eigenVectors);
+    evj.setConf(getConf());
+    evj.run(new String[] {"--input", parsedArgs.get("--input"), "--output", parsedArgs.get("--output")});
+
+    serializeOutput(eigenVectors, eigenValues, outputEigenVectorPath);  
+    return 0;
+  }
+
+  /**
+   * TODO: this should be refactored to allow both LanczosSolver impls to properly serialize output in a generic way.
+   * @param eigenVectors The eigenvectors to be serialized
+   * @param eigenValues The eigenvalues to be serialized
+   * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to.
+   * @throws IOException
+   */
+  public void serializeOutput(Matrix eigenVectors, List<Double> eigenValues, String outputPath) throws IOException {
+    log.info("Persisting " + eigenVectors.numRows() + " eigenVectors and eigenValues to: " + outputPath);
+    Path path = new Path(outputPath);
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+    VectorWritable vw = new VectorWritable();
+    IntWritable iw = new IntWritable();
+    for(int i=0; i<eigenVectors.numRows() - 1; i++) {
+      Vector v = eigenVectors.getRow(i);
+      v.setName("eigenVector" + i + ", eigenvalue = " + eigenValues.get(i));
+      vw.set(v);
+      iw.set(i);
+      seqWriter.append(iw, vw);
+    }
+    seqWriter.close();
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public DistributedLanczosSolverJob job() {
+    return new DistributedLanczosSolverJob();
+  }
+
+  /**
+   * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still
+   * sublcass LanczosSolver.
+   */
+  public class DistributedLanczosSolverJob extends AbstractJob {
+    @Override
+    public void setConf(Configuration conf) {
+      DistributedLanczosSolver.this.setConf(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return DistributedLanczosSolver.this.getConf();
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      Option numRowsOpt = buildOption("numRows",
+                                      "nr",
+                                      "Number of rows of the input matrix",
+                                      "0");
+      Option numColsOpt = buildOption("numCols",
+                                      "nc",
+                                      "Number of columns of the input matrix");
+      Option desiredRankOpt = buildOption("rank",
+                                          "r",
+                                          "Desired decomposition rank (note: only roughly 1/4 to 1/3 "
+                                        + "of these will have the top portion of the spectrum)");
+
+      DistributedLanczosSolver.this.parsedArgs = parseArguments(args, numRowsOpt, numColsOpt, desiredRankOpt);
+      return DistributedLanczosSolver.this.run(args);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new DistributedLanczosSolver().job(), args);
+  }
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,36 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.mahout.math.DenseVector;
+
+/**
+ * TODO this is a horrible hack.  Make a proper writable subclass also.
+ */
+public class EigenVector extends DenseVector {
+
+  public EigenVector(DenseVector v, double eigenValue, double cosAngleError, int order) {
+    super(v, false);
+    setName("e|" + order +"| = |"+eigenValue+"|, err = "+cosAngleError);
+  }
+
+  public double getEigenValue() {
+    return parseMetaData()[1];
+  }
+
+  public double getCosAngleError() {
+    return parseMetaData()[2];
+  }
+
+  public int getIndex() {
+    return (int)parseMetaData()[0];
+  }
+
+  protected double[] parseMetaData() {
+    double[] m = new double[3];
+    String[] s = getName().split(" = ");
+    m[0] = Double.parseDouble(s[0].split("|")[1]);
+    m[1] = Double.parseDouble(s[1].split("|")[1]);
+    m[2] = Double.parseDouble(s[2].substring(1));
+    return m;
+  }
+
+}

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,236 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.OrthonormalityVerifier;
+import org.apache.mahout.math.SparseRowMatrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SimpleEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p>Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness,
+ * in terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error
+ * w.r.t. eigenvector-ness is the cosine of the angle between e and e':</p>
+ * <pre>
+ *   error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
+ * </pre>
+ * <p>A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products
+ * between eigenvectors, and checks that this is close to the identity matrix.
+ * </p>
+ * <p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which
+ * specifies the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies
+ * the maximum error (as defined above) to be tolerated in an eigenvector.</p>
+ * <p>
+ * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
+ * </p>
+ */
+public class EigenVerificationJob extends AbstractJob implements Tool {
+  private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class);
+
+  private SingularVectorVerifier eigenVerifier;
+  private OrthonormalityVerifier orthoVerifier;
+  private VectorIterable eigensToVerify;
+  private VectorIterable corpus;
+  private double maxError;
+  private double minEigenValue;
+  private boolean loadEigensInMemory;
+
+  public void setEigensToVerify(VectorIterable eigens) {
+    eigensToVerify = eigens;
+  }
+
+  private String tmpOut;
+  private String outPath;
+
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Map<String,String> argMap = handleArgs(args);
+    if(argMap == null) {
+      return -1;
+    } else if (argMap.isEmpty()) {
+      return 0;
+    }
+    outPath = argMap.get("--output");
+    tmpOut = outPath + "/tmp";
+
+    if(argMap.get("--eigenInput") != null && eigensToVerify == null) {
+      prepareEigens(argMap.get("--eigenInput"), argMap.get("--inMemory") != null);
+    }
+
+    maxError = Double.parseDouble(argMap.get("--maxError"));
+    minEigenValue = Double.parseDouble(argMap.get("--minEigenvalue"));
+
+    DistributedRowMatrix c = new DistributedRowMatrix(argMap.get("--corpusInput"), tmpOut, 1, 1);
+    c.configure(new JobConf(getConf()));
+    corpus = c;
+
+    // set up eigenverifier and orthoverifier TODO: allow multithreaded execution
+
+    eigenVerifier = new SimpleEigenVerifier();
+    orthoVerifier = new OrthonormalityVerifier();
+
+    VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
+
+    Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+
+    saveCleanEigens(prunedEigenMeta);
+
+    return 0;
+  }
+
+  public Map<String,String> handleArgs(String[] args) {
+    Option eigenInputOpt = buildOption("eigenInput", "ei",
+        "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
+    Option corpusInputOpt = buildOption("corpusInput", "ci",
+        "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
+    Option outOpt = DefaultOptionCreator.outputOption().create();
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    Option inMemOpt = buildOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");
+    Option errorOpt = buildOption("maxError", "err", "Maximum acceptable error", "0.05");
+    Option minEigenValOpt = buildOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
+
+    GroupBuilder gBuilder = new GroupBuilder().withName("Options")
+                                              .withOption(eigenInputOpt)
+                                              .withOption(corpusInputOpt)
+                                              .withOption(helpOpt)
+                                              .withOption(outOpt)
+                                              .withOption(inMemOpt)
+                                              .withOption(errorOpt)
+                                              .withOption(minEigenValOpt);
+    Group group = gBuilder.create();
+
+    Map<String,String> argMap = new HashMap<String,String>();
+
+    CommandLine cmdLine;
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      cmdLine = parser.parse(args);
+    } catch (OptionException e) {
+      log.error(e.getMessage());
+      CommandLineUtil.printHelp(group);
+      return null;
+    }
+    if (cmdLine.hasOption(helpOpt)) {
+      CommandLineUtil.printHelp(group);
+      return argMap;
+    }
+    maybePut(argMap, cmdLine, eigenInputOpt, corpusInputOpt, helpOpt, outOpt, inMemOpt, errorOpt, minEigenValOpt);
+    return argMap;
+  }
+
+  public VectorIterable computePairwiseInnerProducts() {
+    return orthoVerifier.pairwiseInnerProducts(eigensToVerify);
+  }
+
+  public void saveCleanEigens(List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta) throws IOException {
+    Path path = new Path(outPath, "largestCleanEigens");
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+    VectorWritable vw = new VectorWritable();
+    IntWritable iw = new IntWritable();
+    for(Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
+      MatrixSlice s = pruneSlice.getKey();
+      EigenStatus meta = pruneSlice.getValue();
+      EigenVector ev = new EigenVector((DenseVector)s.vector(),
+                                       meta.getEigenValue(),
+                                       Math.abs(1-meta.getCosAngle()),
+                                       s.index());
+      log.info("appending " + ev.getName() + " to " + path.toString());
+      vw.set(ev);
+      iw.set(s.index());
+      seqWriter.append(iw, vw);
+    }
+    seqWriter.close();
+  }
+
+  public List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = new ArrayList<Map.Entry<MatrixSlice,EigenStatus>>();
+
+    for(Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
+      if(Math.abs(1-entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
+        prunedEigenMeta.add(entry);
+      }
+    }
+
+    Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
+      @Override
+      public int compare(Map.Entry<MatrixSlice, EigenStatus> e1, Map.Entry<MatrixSlice, EigenStatus> e2) {
+        return e1.getKey().index() - e2.getKey().index();
+      }
+    });
+    return prunedEigenMeta;
+  }
+
+  public Map<MatrixSlice,EigenStatus> verifyEigens() {
+    Map<MatrixSlice, EigenStatus> eigenMetaData = new HashMap<MatrixSlice, EigenStatus>();
+
+    for(MatrixSlice slice : eigensToVerify) {
+      EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
+      eigenMetaData.put(slice, status);
+    }
+    return eigenMetaData;
+  }
+
+  private void prepareEigens(String eigenInput, boolean inMemory) {
+    DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1);
+    eigens.configure(new JobConf(getConf()));
+    if(inMemory) {
+      List<Vector> eigenVectors = new ArrayList<Vector>();
+      for(MatrixSlice slice : eigens) {
+        eigenVectors.add(slice.vector());
+      }
+      eigensToVerify = new SparseRowMatrix(new int[] {eigenVectors.size(), eigenVectors.get(0).size()},
+                                           eigenVectors.toArray(new Vector[eigenVectors.size()]),
+                                           true,
+                                           true);
+
+    } else {
+      eigensToVerify = eigens;
+    }                 
+  }
+
+  public static void main(String[] args) throws Exception {
+    int retVal = ToolRunner.run(new EigenVerificationJob(), args);
+    System.exit(retVal);
+  }
+}

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java Sat Feb 20 15:45:47 2010
@@ -32,7 +32,7 @@
   private ClusteringTestUtils() {
   }
 
-  public static void writePointsToFile(List<VectorWritable> points, String fileName, FileSystem fs, Configuration conf)
+  public static void writePointsToFile(Iterable<VectorWritable> points, String fileName, FileSystem fs, Configuration conf)
       throws IOException {
     Path path = new Path(fileName);
     SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class);

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Sat Feb 20 15:45:47 2010
@@ -117,7 +117,7 @@
   }
 
 
-  private static void rmr(String path) {
+  public static void rmr(String path) {
     File f = new File(path);
     if (f.exists()) {
       if (f.isDirectory()) {

Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,86 @@
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.TestCanopyCreation;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.SolverTest;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class TestDistributedLanczosSolver extends SolverTest {
+
+  public TestDistributedLanczosSolver(String name) {
+    super(name);
+  }
+
+  public void testDistributedLanczosSolver() throws Exception {
+    File testData = new File("testdata");
+    if (!testData.exists()) {
+      testData.mkdir();
+    }
+    DistributedRowMatrix corpus = randomDistributedMatrix(1000, 900, 800, 100, 10.0, "testdata");
+    corpus.configure(new JobConf());
+    DistributedLanczosSolver solver = new DistributedLanczosSolver();
+    int desiredRank = 30;
+    Matrix eigenVectors = new DenseMatrix(desiredRank, 800);
+    List<Double> eigenValues = new ArrayList<Double>();
+    solver.solve(corpus, desiredRank, eigenVectors, eigenValues);
+    assertOrthonormal(eigenVectors);
+    assertEigen(eigenVectors, corpus, eigenVectors.numRows() / 2, 0.01);
+  }
+
+  public void tearDown() throws Exception {
+    TestCanopyCreation.rmr("testData");
+  }
+
+
+  public static DistributedRowMatrix randomDistributedMatrix(int numRows,
+                                                          int nonNullRows,
+                                                          int numCols,
+                                                          int entriesPerRow,
+                                                          double entryMean,
+                                                          String baseTmpDir) throws IOException {
+    final Matrix m = randomSequentialAccessSparseMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    ClusteringTestUtils.writePointsToFile(new Iterable<VectorWritable>() {
+      @Override
+      public Iterator<VectorWritable> iterator() {
+        final Iterator<MatrixSlice> it = m.iterator();
+        final VectorWritable v = new VectorWritable();
+        return new Iterator<VectorWritable>() {
+          @Override
+          public boolean hasNext() { return it.hasNext(); }
+          @Override
+          public VectorWritable next() {
+            MatrixSlice slice = it.next();
+            v.set(slice.vector());
+            return v;
+          }
+          @Override
+          public void remove() { it.remove(); }
+        };
+      }
+    }, baseTmpDir + "/distMatrix", fs, conf);
+
+    DistributedRowMatrix distMatrix = new DistributedRowMatrix(baseTmpDir + "/distMatrix",
+                                                               baseTmpDir + "/tmpOut",
+                                                               m.numRows(),
+                                                               m.numCols());
+
+    return distMatrix;
+  }
+}

Added: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java (added)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/OrthonormalityVerifier.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,25 @@
+package org.apache.mahout.math;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OrthonormalityVerifier {
+
+  public VectorIterable pairwiseInnerProducts(VectorIterable basis) {
+    DenseMatrix out = null;
+    for(MatrixSlice slice1 : basis) {
+      List<Double> dots = new ArrayList<Double>();
+      for(MatrixSlice slice2 : basis) {
+        dots.add(slice1.vector().dot(slice2.vector()));
+      }
+      if(out == null) {
+        out = new DenseMatrix(dots.size(), dots.size());
+      }
+      for(int i=0; i<dots.size(); i++) {
+        out.set(slice1.index(), i, dots.get(i));
+      }
+    }
+    return out;
+  }
+
+}

Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/MultiThreadedEigenVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java Sat Feb 20 15:45:47 2010
@@ -15,35 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
-import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
 
 
-public class MultiThreadedEigenVerifier extends SimpleEigenVerifier {
+public class AsyncEigenVerifier extends SimpleEigenVerifier {
 
   private final Executor threadPool;
   private EigenStatus status = null;
   private boolean finished = false;
   private boolean started = false;
 
-  public MultiThreadedEigenVerifier() {
+  public AsyncEigenVerifier() {
     threadPool = Executors.newFixedThreadPool(1);
     status = new EigenStatus(-1, 0);
   }
 
   @Override
-  public EigenStatus verify(Matrix eigenMatrix, Vector vector) {
+  public EigenStatus verify(VectorIterable corpus, Vector vector) {
     synchronized (status) {
       if (!finished && !started) // not yet started or finished, so start!
       {
         status = new EigenStatus(-1, 0);
         Vector vectorCopy = vector.clone();
-        threadPool.execute(new VerifierRunnable(eigenMatrix, vectorCopy));
+        threadPool.execute(new VerifierRunnable(corpus, vectorCopy));
         started = true;
       }
       if (finished) finished = false;
@@ -51,23 +51,23 @@
     }
   }
 
-  protected EigenStatus innerVerify(Matrix eigenMatrix, Vector vector) {
-    return super.verify(eigenMatrix, vector);
+  protected EigenStatus innerVerify(VectorIterable corpus, Vector vector) {
+    return super.verify(corpus, vector);
   }
 
   private class VerifierRunnable implements Runnable {
-    private final Matrix eigenMatrix;
+    private final VectorIterable corpus;
     private final Vector vector;
 
-    protected VerifierRunnable(Matrix eigenMatrix, Vector vector) {
-      this.eigenMatrix = eigenMatrix;
+    protected VerifierRunnable(VectorIterable corpus, Vector vector) {
+      this.corpus = corpus;
       this.vector = vector;
     }
 
     public void run() {
-      EigenStatus status = innerVerify(eigenMatrix, vector);
-      synchronized (MultiThreadedEigenVerifier.this.status) {
-        MultiThreadedEigenVerifier.this.status = status;
+      EigenStatus status = innerVerify(corpus, vector);
+      synchronized (AsyncEigenVerifier.this.status) {
+        AsyncEigenVerifier.this.status = status;
         finished = true;
         started = false;
       }

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
    svn:keywords = "Date Rev Author URL Id"

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/AsyncEigenVerifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/EigenStatus.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java Sat Feb 20 15:45:47 2010
@@ -15,15 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
 
 public class EigenStatus {
   private final double eigenValue;
   private final double cosAngle;
+  private Boolean inProgress;
 
   public EigenStatus(double eigenValue, double cosAngle) {
+    this(eigenValue, cosAngle, true);
+  }
+
+  public EigenStatus(double eigenValue, double cosAngle, boolean inProgress) {
     this.eigenValue = eigenValue;
     this.cosAngle = cosAngle;
+    this.inProgress = inProgress;
   }
 
   public double getCosAngle() {
@@ -33,4 +39,16 @@
   public double getEigenValue() {
     return eigenValue;
   }
+
+  public boolean inProgress() {
+    synchronized (inProgress) {
+      return inProgress;
+    }
+  }
+
+  void setInProgress(boolean status) {
+    synchronized (inProgress) {
+      inProgress = status;
+    }
+  }
 }

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
    svn:keywords = "Date Rev Author URL Id"

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/EigenStatus.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SimpleEigenVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java Sat Feb 20 15:45:47 2010
@@ -14,20 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
 
 import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
 
 public class SimpleEigenVerifier implements SingularVectorVerifier {
 
-  public EigenStatus verify(Matrix eigenMatrix, Vector vector) {
-    Vector resultantVector = eigenMatrix.timesSquared(vector);
+  public EigenStatus verify(VectorIterable corpus, Vector vector) {
+    Vector resultantVector = corpus.timesSquared(vector);
     double newNorm = resultantVector.norm(2);
     double oldNorm = vector.norm(2);
     double eigenValue = (newNorm > 0 && oldNorm > 0) ? newNorm / oldNorm : 1;
     double cosAngle = (newNorm > 0 && oldNorm > 0) ? resultantVector.dot(vector) / (newNorm * oldNorm) : 0;
-    return new EigenStatus(eigenValue, cosAngle);
+    return new EigenStatus(eigenValue, cosAngle, false);
   }
 
 }

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
    svn:keywords = "Date Rev Author URL Id"

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SimpleEigenVerifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java (from r910284, lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java?p2=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java&p1=lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java&r1=910284&r2=912134&rev=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/SingularVectorVerifier.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java Sat Feb 20 15:45:47 2010
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.math.decomposer.hebbian;
+package org.apache.mahout.math.decomposer;
 
 import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.decomposer.EigenStatus;
 
 public interface SingularVectorVerifier {
-  EigenStatus verify(Matrix eigenMatrix, Vector vector);
+  EigenStatus verify(VectorIterable eigenMatrix, Vector vector);
 }

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
    svn:keywords = "Date Rev Author URL Id"

Propchange: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/SingularVectorVerifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianSolver.java Sat Feb 20 15:45:47 2010
@@ -27,6 +27,9 @@
 import org.apache.mahout.math.DenseMatrix;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.decomposer.AsyncEigenVerifier;
+import org.apache.mahout.math.decomposer.EigenStatus;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
 import org.apache.mahout.math.function.TimesFunction;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.function.PlusMult;
@@ -58,7 +61,7 @@
    * @param updater           {@link EigenUpdater} used to do the actual work of iteratively updating the current "best guess"
    *                          singular vector one data-point presentation at a time.
    * @param verifier          {@link SingularVectorVerifier } an object which perpetually tries to check how close to
-   *                          convergence the current singular vector is (typically is a {@link MultiThreadedEigenVerifier } which does this
+   *                          convergence the current singular vector is (typically is a {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } which does this
    *                          in the background in another thread, while the main thread continues to converge)
    * @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
    *                          angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -83,8 +86,8 @@
    *
    * @param updater           {@link EigenUpdater} used to do the actual work of iteratively updating the current "best guess"
    *                          singular vector one data-point presentation at a time.
-   * @param verifier          {@link SingularVectorVerifier } an object which perpetually tries to check how close to
-   *                          convergence the current singular vector is (typically is a {@link MultiThreadedEigenVerifier } which does this
+   * @param verifier          {@link org.apache.mahout.math.decomposer.SingularVectorVerifier } an object which perpetually tries to check how close to
+   *                          convergence the current singular vector is (typically is a {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } which does this
    *                          in the background in another thread, while the main thread continues to converge)
    * @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
    *                          angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -102,7 +105,7 @@
   /**
    * <b>This is the recommended constructor to use if you're not sure</b>
    * Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
-   * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread.
+   * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread.
    *
    * @param convergenceTarget a small "epsilon" value which tells the solver how small you want the cosine of the
    *                          angle between a proposed eigenvector and that same vector after being multiplied by the (square of the) input
@@ -112,14 +115,14 @@
    */
   public HebbianSolver(double convergenceTarget, int maxPassesPerEigen) {
     this(new HebbianUpdater(),
-        new MultiThreadedEigenVerifier(),
+        new AsyncEigenVerifier(),
         convergenceTarget,
         maxPassesPerEigen);
   }
 
   /**
    * Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
-   * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread, with
+   * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread, with
    * maxPassesPerEigen set to Integer.MAX_VALUE.  <b>Not recommended</b> unless only looking for the first few (5, maybe 10?) singular
    * vectors, as small errors which compound early on quickly put a minimum error on subsequent vectors.
    *
@@ -133,7 +136,7 @@
 
   /**
    * Creates a new HebbianSolver with the default {@link HebbianUpdater } to do the updating work, and the default
-   * {@link MultiThreadedEigenVerifier } to check for convergence in a (single) background thread, with
+   * {@link org.apache.mahout.math.decomposer.AsyncEigenVerifier } to check for convergence in a (single) background thread, with
    * convergenceTarget set to 0, which means that the solver will not really care about convergence as a loop-exiting
    * criterion (but will be checking for convergence anyways, so it will be logged and singular values will be
    * saved).
@@ -276,10 +279,7 @@
      * Step 3: verify how eigen-like the prospective eigen is.  This is potentially asynchronous.
      */
     EigenStatus status = verify(corpus, currentPseudoEigen);
-    /**
-     *  TODO: Having the cosAngle() be zero is not a good signal for an unfinished verification.
-     */
-    if (status.getCosAngle() == 0) {
+    if (status.inProgress()) {
       log.info("Verifier not finished, making another pass...");
     } else {
       log.info("Has 1 - cosAngle: {}, convergence target is: {}", (1 - status.getCosAngle()), convergenceTarget);
@@ -310,7 +310,7 @@
     int numThreads = Integer.parseInt(props.getProperty("solver.verifier.numThreads"));
 
     HebbianUpdater updater = new HebbianUpdater();
-    SingularVectorVerifier verifier = new MultiThreadedEigenVerifier();
+    SingularVectorVerifier verifier = new AsyncEigenVerifier();
     HebbianSolver solver = new HebbianSolver(updater,
         verifier,
         convergence,

Modified: lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java (original)
+++ lucene/mahout/trunk/math/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java Sat Feb 20 15:45:47 2010
@@ -23,6 +23,7 @@
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.decomposer.EigenStatus;
 
 
 public class TrainingState {

Modified: lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java (original)
+++ lucene/mahout/trunk/math/src/test/java/org/apache/mahout/math/decomposer/hebbian/TestHebbianSolver.java Sat Feb 20 15:45:47 2010
@@ -20,6 +20,8 @@
 import org.apache.mahout.math.DenseMatrix;
 import org.apache.mahout.math.Matrix;
 
+import org.apache.mahout.math.decomposer.AsyncEigenVerifier;
+import org.apache.mahout.math.decomposer.SingularVectorVerifier;
 import org.apache.mahout.math.decomposer.SolverTest;
 
 /**
@@ -51,7 +53,7 @@
                                 int desiredRank,
                                 TrainingState state) throws Exception {
     HebbianUpdater updater = new HebbianUpdater();
-    SingularVectorVerifier verifier = new MultiThreadedEigenVerifier();
+    SingularVectorVerifier verifier = new AsyncEigenVerifier();
     HebbianSolver solver = new HebbianSolver(updater,
                                              verifier,
                                              convergence,

Added: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java?rev=912134&view=auto
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java (added)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java Sat Feb 20 15:45:47 2010
@@ -0,0 +1,66 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.AbstractJob;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+public class TextParagraphSplittingJob extends AbstractJob {
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    Map<String,String> args = parseArguments(strings);
+    JobConf conf = prepareJobConf(args.get("--input"),
+                                  args.get("--output"),
+                                  args.get("--jarFile"),
+                                  SequenceFileInputFormat.class,
+                                  SplitMap.class,
+                                  Text.class,
+                                  Text.class,
+                                  Reducer.class,
+                                  Text.class,
+                                  Text.class,
+                                  SequenceFileOutputFormat.class);
+    conf.setNumReduceTasks(0);
+
+    JobClient.runJob(conf).waitForCompletion();
+    return 1;
+  }
+
+  public static class SplitMap extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
+
+    @Override
+    public void map(Text key,
+                    Text text,
+                    OutputCollector<Text, Text> out,
+                    Reporter reporter) throws IOException {
+      Text outText = new Text();
+      int loc = 0;
+      while(loc >= 0 && loc < text.getLength()) {
+        int nextLoc = text.find("\n\n", loc+1);
+        if(nextLoc > 0) {
+          outText.set(text.getBytes(), loc, (nextLoc - loc));
+          out.collect(key, outText);
+        }
+        loc = nextLoc;
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new TextParagraphSplittingJob(), args);
+  }
+}

Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java?rev=912134&r1=912133&r2=912134&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/text/term/TFPartialVectorReducer.java Sat Feb 20 15:45:47 2010
@@ -102,9 +102,13 @@
     if (sequentialAccess) {
       vector = new SequentialAccessSparseVector(vector);
     }
-    vectorWritable.set(vector);
-    output.collect(key, vectorWritable);
-    
+    // if the vector has no nonZero entries (nothing in the dictionary), let's not waste space sending it to disk.
+    if(vector.getNumNondefaultElements() > 0) {
+      vectorWritable.set(vector);
+      output.collect(key, vectorWritable);
+    } else {
+      reporter.incrCounter("TFParticalVectorReducer", "emptyVectorCount", 1);
+    }
   }
   
   @Override