You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2013/06/28 00:32:02 UTC

svn commit: r1497591 - in /mahout/trunk/core/src: main/java/org/apache/mahout/clustering/kmeans/ main/java/org/apache/mahout/clustering/spectral/kmeans/ main/java/org/apache/mahout/math/hadoop/decomposer/ test/java/org/apache/mahout/clustering/kmeans/

Author: robinanil
Date: Thu Jun 27 22:32:02 2013
New Revision: 1497591

URL: http://svn.apache.org/r1497591
Log:
MAHOUT-1214 Improve the accuracy of the Spectral KMeans Method(Yiqun Hu, Zhang Da)

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java?rev=1497591&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java Thu Jun 27 22:32:02 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+/**
+ * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, select k vectors and write them to the
+ * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} representing the initial centroid to use. The
+ * selection criterion is the rows with max value in that respective column
+ */
+public final class EigenSeedGenerator {
+
+  private static final Logger log = LoggerFactory.getLogger(EigenSeedGenerator.class);
+
+  public static final String K = "k";
+
+  private EigenSeedGenerator() {}
+
+  public static Path buildFromEigens(Configuration conf, Path input, Path output, int k, DistanceMeasure measure)
+      throws IOException {
+    // delete the output directory
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    HadoopUtil.delete(conf, output);
+    Path outFile = new Path(output, "part-eigenSeed");
+    boolean newFile = fs.createNewFile(outFile);
+    if (newFile) {
+      Path inputPathPattern;
+
+      if (fs.getFileStatus(input).isDir()) {
+        inputPathPattern = new Path(input, "*");
+      } else {
+        inputPathPattern = input;
+      }
+
+      FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter());
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class);
+      Map<Integer,Double> maxEigens = Maps.newHashMapWithExpectedSize(k); // store
+                                                                          // max
+                                                                          // value
+                                                                          // of
+                                                                          // each
+                                                                          // column
+      Map<Integer,Text> chosenTexts = Maps.newHashMapWithExpectedSize(k);
+      Map<Integer,ClusterWritable> chosenClusters = Maps.newHashMapWithExpectedSize(k);
+
+      for (FileStatus fileStatus : inputFiles) {
+        if (fileStatus.isDir()) {
+          continue;
+        }
+        for (Pair<Writable,VectorWritable> record : new SequenceFileIterable<Writable,VectorWritable>(
+            fileStatus.getPath(), true, conf)) {
+          Writable key = record.getFirst();
+          VectorWritable value = record.getSecond();
+          Iterator<Vector.Element> nonZeroElements = value.get().nonZeroes().iterator();
+
+          while (nonZeroElements.hasNext()) {
+            Vector.Element e = nonZeroElements.next();
+            int index = e.index();
+            double v = Math.abs(e.get());
+
+            if (!maxEigens.containsKey(index) || v > maxEigens.get(index)) {
+              maxEigens.put(index, v);
+              Text newText = new Text(key.toString());
+              chosenTexts.put(index, newText);
+              Kluster newCluster = new Kluster(value.get(), index, measure);
+              newCluster.observe(value.get(), 1);
+              ClusterWritable clusterWritable = new ClusterWritable();
+              clusterWritable.setValue(newCluster);
+              chosenClusters.put(index, clusterWritable);
+            }
+          }
+        }
+      }
+
+      try {
+        Iterator<Integer> iter = maxEigens.keySet().iterator();
+        while (iter.hasNext()) {
+          int key = iter.next();
+          writer.append(chosenTexts.get(key), chosenClusters.get(key));
+        }
+        log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", chosenTexts.size(), outFile);
+      } finally {
+        Closeables.close(writer, false);
+      }
+    }
+
+    return outFile;
+  }
+
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1497591&r1=1497590&r2=1497591&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Thu Jun 27 22:32:02 2013
@@ -22,11 +22,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.EigenSeedGenerator;
 import org.apache.mahout.clustering.kmeans.KMeansDriver;
-import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
 import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
 import org.apache.mahout.clustering.spectral.common.UnitVectorizerJob;
@@ -34,21 +39,26 @@ import org.apache.mahout.clustering.spec
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.decomposer.lanczos.LanczosState;
 import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
 import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
 import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 
 /**
- * Performs spectral k-means clustering on the top k eigenvectors of the input
- * affinity matrix. 
+ * Performs spectral k-means clustering on the top k eigenvectors of the input affinity matrix.
  */
 public class SpectralKMeansDriver extends AbstractJob {
+  private static final Logger log = LoggerFactory.getLogger(SpectralKMeansDriver.class);
 
   public static final double OVERSHOOTMULTIPLIER = 2.0;
   public static final int REDUCERS = 10;
@@ -78,7 +88,7 @@ public class SpectralKMeansDriver extend
     addOption("oversampling", "p", "Oversampling parameter for SSVD", String.valueOf(OVERSAMPLING));
     addOption("powerIter", "q", "Additional power iterations for SSVD", String.valueOf(POWERITERS));
 
-    Map<String, List<String>> parsedArgs = parseArguments(arg0);
+    Map<String,List<String>> parsedArgs = parseArguments(arg0);
     if (parsedArgs == null) {
       return 0;
     }
@@ -111,55 +121,45 @@ public class SpectralKMeansDriver extend
     return 0;
   }
 
-  public static void run(
-          Configuration conf,
-          Path input,
-          Path output,
-          int numDims,
-          int clusters,
-          DistanceMeasure measure,
-          double convergenceDelta,
-          int maxIterations,
-          Path tempDir,
-          boolean ssvd) throws IOException, InterruptedException, ClassNotFoundException {
+  public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+      DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd)
+      throws IOException, InterruptedException, ClassNotFoundException {
     run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, ssvd, REDUCERS,
         BLOCKHEIGHT, OVERSAMPLING, POWERITERS);
   }
 
   /**
    * Run the Spectral KMeans clustering on the supplied arguments
-   * 
-   * @param conf the Configuration to be used
-   * @param input the Path to the input tuples directory
-   * @param output the Path to the output directory
-   * @param numDims the int number of dimensions of the affinity matrix
-   * @param clusters the int number of eigenvectors and thus clusters to produce
-   * @param measure the DistanceMeasure for the k-Means calculations
-   * @param convergenceDelta the double convergence delta for the k-Means calculations
-   * @param maxIterations the int maximum number of iterations for the k-Means calculations
-   * @param tempDir Temporary directory for intermediate calculations
-   * @param ssvd Flag to indicate the eigensolver to use
+   *
+   * @param conf
+   *          the Configuration to be used
+   * @param input
+   *          the Path to the input tuples directory
+   * @param output
+   *          the Path to the output directory
+   * @param numDims
+   *          the int number of dimensions of the affinity matrix
+   * @param clusters
+   *          the int number of eigenvectors and thus clusters to produce
+   * @param measure
+   *          the DistanceMeasure for the k-Means calculations
+   * @param convergenceDelta
+   *          the double convergence delta for the k-Means calculations
+   * @param maxIterations
+   *          the int maximum number of iterations for the k-Means calculations
+   * @param tempDir
+   *          Temporary directory for intermediate calculations
+   * @param ssvd
+   *          Flag to indicate the eigensolver to use
    * @param numReducers
    * @param blockHeight
    * @param oversampling
    * @param poweriters
    */
-  public static void run(
-      Configuration conf,
-      Path input,
-      Path output,
-      int numDims,
-      int clusters,
-      DistanceMeasure measure,
-      double convergenceDelta,
-      int maxIterations,
-      Path tempDir,
-      boolean ssvd,
-      int numReducers,
-      int blockHeight,
-      int oversampling,
-      int poweriters)
-    throws IOException, InterruptedException, ClassNotFoundException {
+  public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+      DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd, int numReducers,
+      int blockHeight, int oversampling, int poweriters) throws IOException, InterruptedException,
+      ClassNotFoundException {
 
     Path outputCalc = new Path(tempDir, "calculations");
     Path outputTmp = new Path(tempDir, "temporary");
@@ -171,8 +171,7 @@ public class SpectralKMeansDriver extend
     AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
 
     // Construct the affinity matrix using the newly-created sequence files
-    DistributedRowMatrix A =
-        new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
+    DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
 
     Configuration depConf = new Configuration(conf);
     A.setConf(depConf);
@@ -180,34 +179,27 @@ public class SpectralKMeansDriver extend
     // Construct the diagonal matrix D (represented as a vector)
     Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
 
-    //Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
-    DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
-        new Path(outputCalc, "laplacian"), new Path(outputCalc, outputCalc));
+    // Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+    DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian"),
+        new Path(outputCalc, outputCalc));
     L.setConf(depConf);
 
     Path data;
 
     if (ssvd) {
       // SSVD requires an array of Paths to function. So we pass in an array of length one
-      Path [] LPath = new Path[1];
+      Path[] LPath = new Path[1];
       LPath[0] = L.getRowPath();
 
       Path SSVDout = new Path(outputCalc, "SSVD");
 
-      SSVDSolver solveIt = new SSVDSolver(
-          depConf,
-          LPath,
-          SSVDout,
-          blockHeight,
-          clusters,
-          oversampling,
-          numReducers);
+      SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers);
 
       solveIt.setComputeV(false);
       solveIt.setComputeU(true);
       solveIt.setOverwrite(true);
       solveIt.setQ(poweriters);
-      //solveIt.setBroadcast(false);
+      // solveIt.setBroadcast(false);
       solveIt.run();
       data = new Path(solveIt.getUPath());
     } else {
@@ -220,48 +212,67 @@ public class SpectralKMeansDriver extend
       LanczosState state = new LanczosState(L, overshoot, DistributedLanczosSolver.getInitialVector(L));
       Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors");
 
-      solver.runJob(conf,
-                    state,
-                    overshoot,
-                    true,
-                    lanczosSeqFiles.toString());
+      solver.runJob(conf, state, overshoot, true, lanczosSeqFiles.toString());
 
       // perform a verification
       EigenVerificationJob verifier = new EigenVerificationJob();
       Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
-      verifier.runJob(conf,
-              lanczosSeqFiles,
-              L.getRowPath(),
-              verifiedEigensPath,
-              true,
-              1.0,
-              clusters);
+      verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, clusters);
 
       Path cleanedEigens = verifier.getCleanedEigensPath();
-      DistributedRowMatrix W = new DistributedRowMatrix(
-          cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+      DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters,
+          numDims);
       W.setConf(depConf);
       DistributedRowMatrix Wtrans = W.transpose();
       data = Wtrans.getRowPath();
     }
 
     // Normalize the rows of Wt to unit length
-    // normalize is important because it reduces the occurrence of two unique clusters  combining into one
+    // normalize is important because it reduces the occurrence of two unique clusters combining into one
     Path unitVectors = new Path(outputCalc, "unitvectors");
 
     UnitVectorizerJob.runJob(data, unitVectors);
 
-    DistributedRowMatrix Wt = new DistributedRowMatrix(
-        unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+    DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
     Wt.setConf(depConf);
     data = Wt.getRowPath();
 
-    // Generate random initial clusters
-    Path initialclusters = RandomSeedGenerator.buildRandom(conf, data,
+    // Generate initial clusters using EigenSeedGenerator which picks rows as centroids if that row contains max
+    // eigen value in that column
+    Path initialclusters = EigenSeedGenerator.buildFromEigens(conf, data,
         new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
 
-        // Run the KMeansDriver
+    // Run the KMeansDriver
     Path answer = new Path(output, "kmeans_out");
-    KMeansDriver.run(conf, data, initialclusters, answer, measure,convergenceDelta, maxIterations, true, 0.0, false);
+    KMeansDriver.run(conf, data, initialclusters, answer, measure, convergenceDelta, maxIterations, true, 0.0, false);
+
+    // Restore name to id mapping and read through the cluster assignments
+    Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), "generic_input_mapping");
+    List<String> mapping = Lists.newArrayList();
+    FileSystem fs = FileSystem.get(mappingPath.toUri(), conf);
+    if (fs.exists(mappingPath)) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, conf);
+      Text mappingValue = new Text();
+      IntWritable mappingIndex = new IntWritable();
+      while (reader.next(mappingIndex, mappingValue)) {
+        String s = new String(mappingValue.toString());
+        mapping.add(s);
+      }
+      HadoopUtil.delete(conf, mappingPath);
+    } else {
+      log.warn("generic input mapping file not found!");
+    }
+
+    Path clusteredPointsPath = new Path(answer, "clusteredPoints");
+    Path inputPath = new Path(clusteredPointsPath, "part-m-00000");
+    int id = 0;
+    for (Pair<IntWritable, WeightedVectorWritable> record :
+         new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) {
+      if (!mapping.isEmpty()) {
+        log.info("{}: {}", mapping.get(id++), record.getFirst().get());
+      } else {
+        log.info("{}: {}", id++, record.getFirst().get());
+      }
+    }
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=1497591&r1=1497590&r2=1497591&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Thu Jun 27 22:32:02 2013
@@ -49,19 +49,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * <p>Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness,
- * in terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error
- * w.r.t. eigenvector-ness is the cosine of the angle between e and e':</p>
+ * <p>
+ * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in
+ * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t.
+ * eigenvector-ness is the cosine of the angle between e and e':
+ * </p>
+ *
  * <pre>
  *   error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
  * </pre>
- * <p>A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products
- * between eigenvectors, and checks that this is close to the identity matrix.
+ * <p>
+ * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between
+ * eigenvectors, and checks that this is close to the identity matrix.
  * </p>
  * <p>
- * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which
- * specifies the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies
- * the maximum error (as defined above) to be tolerated in an eigenvector.</p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies
+ * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum
+ * error (as defined above) to be tolerated in an eigenvector.
+ * </p>
  * <p>
  * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
  * </p>
@@ -83,7 +88,7 @@ public class EigenVerificationJob extend
 
   private double minEigenValue;
 
-  //private boolean loadEigensInMemory;
+  // private boolean loadEigensInMemory;
 
   private Path tmpOut;
 
@@ -99,7 +104,7 @@ public class EigenVerificationJob extend
 
   @Override
   public int run(String[] args) throws Exception {
-    Map<String, List<String>> argMap = handleArgs(args);
+    Map<String,List<String>> argMap = handleArgs(args);
     if (argMap == null) {
       return -1;
     }
@@ -107,37 +112,36 @@ public class EigenVerificationJob extend
       return 0;
     }
     // parse out the arguments
-    runJob(getConf(),
-           new Path(getOption("eigenInput")),
-           new Path(getOption("corpusInput")),
-           getOutputPath(),
-           getOption("inMemory") != null,
-           Double.parseDouble(getOption("maxError")),
-           //Double.parseDouble(getOption("minEigenvalue")),
-           Integer.parseInt(getOption("maxEigens")));
+    runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(),
+        getOption("inMemory") != null, Double.parseDouble(getOption("maxError")),
+        // Double.parseDouble(getOption("minEigenvalue")),
+        Integer.parseInt(getOption("maxEigens")));
     return 0;
   }
 
   /**
    * Run the job with the given arguments
-   * @param corpusInput the corpus input Path
-   * @param eigenInput the eigenvector input Path
-   * @param output the output Path
-   * @param tempOut temporary output Path
-   * @param maxError a double representing the maximum error
-   * @param minEigenValue a double representing the minimum eigenvalue
-   * @param inMemory a boolean requesting in-memory preparation
-   * @param conf the Configuration to use, or null if a default is ok
-   *  (saves referencing Configuration in calling classes unless needed)
+   *
+   * @param corpusInput
+   *          the corpus input Path
+   * @param eigenInput
+   *          the eigenvector input Path
+   * @param output
+   *          the output Path
+   * @param tempOut
+   *          temporary output Path
+   * @param maxError
+   *          a double representing the maximum error
+   * @param minEigenValue
+   *          a double representing the minimum eigenvalue
+   * @param inMemory
+   *          a boolean requesting in-memory preparation
+   * @param conf
+   *          the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes
+   *          unless needed)
    */
-  public int run(Path corpusInput,
-                 Path eigenInput,
-                 Path output,
-                 Path tempOut,
-                 double maxError,
-                 double minEigenValue,
-                 boolean inMemory,
-                 Configuration conf) throws IOException {
+  public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue,
+      boolean inMemory, Configuration conf) throws IOException {
     this.outPath = output;
     this.tmpOut = tempOut;
     this.maxError = maxError;
@@ -157,20 +161,18 @@ public class EigenVerificationJob extend
     // we don't currently verify orthonormality here.
     // VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
 
-    Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens();
+    Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
 
-    List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
 
     saveCleanEigens(new Configuration(), prunedEigenMeta);
     return 0;
   }
 
-  private Map<String, List<String>> handleArgs(String[] args) throws IOException {
+  private Map<String,List<String>> handleArgs(String[] args) throws IOException {
     addOutputOption();
-    addOption("eigenInput",
-              "ei",
-              "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.",
-              null);
+    addOption("eigenInput", "ei",
+        "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
     addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
     addOption(DefaultOptionCreator.outputOption().create());
     addOption(DefaultOptionCreator.helpOption());
@@ -182,24 +184,22 @@ public class EigenVerificationJob extend
     return parseArguments(args);
   }
 
-  private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta)
-    throws IOException {
+  private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta)
+      throws IOException {
     Path path = new Path(outPath, CLEAN_EIGENVECTORS);
     FileSystem fs = FileSystem.get(path.toUri(), conf);
     SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
     try {
       IntWritable iw = new IntWritable();
       int numEigensWritten = 0;
-      for (Map.Entry<MatrixSlice, EigenStatus> pruneSlice : prunedEigenMeta) {
+      int index = 0;
+      for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
         MatrixSlice s = pruneSlice.getKey();
         EigenStatus meta = pruneSlice.getValue();
-        EigenVector ev = new EigenVector(s.vector(),
-                                         meta.getEigenValue(),
-                                         Math.abs(1 - meta.getCosAngle()),
-                                         s.index());
-        //log.info("appending {} to {}", ev, path);
+        EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index());
+        // log.info("appending {} to {}", ev, path);
         Writable vw = new VectorWritable(ev);
-        iw.set(s.index());
+        iw.set(index++);
         seqWriter.append(iw, vw);
 
         // increment the number of eigenvectors written and see if we've
@@ -217,34 +217,64 @@ public class EigenVerificationJob extend
     cleanedEigensPath = path;
   }
 
-  private List<Map.Entry<MatrixSlice, EigenStatus>> pruneEigens(Map<MatrixSlice, EigenStatus> eigenMetaData) {
-    List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = Lists.newArrayList();
+  private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList();
 
-    for (Map.Entry<MatrixSlice, EigenStatus> entry : eigenMetaData.entrySet()) {
+    for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
       if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
         prunedEigenMeta.add(entry);
       }
     }
 
-    Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice, EigenStatus>>() {
+    Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
       @Override
       public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) {
-        int index1 = e1.getKey().index();
-        int index2 = e2.getKey().index();
-        if (index1 < index2) {
-          return -1;
-        }
-        if (index1 > index2) {
+        // sort eigens on eigenvalues in descending order
+        double eg1 = e1.getValue().getEigenValue();
+        double eg2 = e2.getValue().getEigenValue();
+        if (eg1 < eg2) {
           return 1;
         }
+        if (eg1 > eg2) {
+          return -1;
+        }
         return 0;
       }
     });
-    return prunedEigenMeta;
+
+    // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones
+    List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList();
+    Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0);
+    selectedEigenMeta.add(e1);
+    int selectedEigenMetaLength = selectedEigenMeta.size();
+    int prunedEigenMetaLength = prunedEigenMeta.size();
+
+    while (prunedEigenMetaLength > 0) {
+      double sum = Double.MAX_VALUE;
+      int index = 0;
+      for (int i = 0; i < prunedEigenMetaLength; i++) {
+        Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i);
+        double tmp = 0;
+        for (int j = 0; j < selectedEigenMetaLength; j++) {
+          Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j);
+          tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2);
+        }
+        if (tmp < sum) {
+          sum = tmp;
+          index = i;
+        }
+      }
+      Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index);
+      selectedEigenMeta.add(e);
+      selectedEigenMetaLength++;
+      prunedEigenMetaLength--;
+    }
+
+    return selectedEigenMeta;
   }
 
-  private Map<MatrixSlice, EigenStatus> verifyEigens() {
-    Map<MatrixSlice, EigenStatus> eigenMetaData = Maps.newHashMap();
+  private Map<MatrixSlice,EigenStatus> verifyEigens() {
+    Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap();
 
     for (MatrixSlice slice : eigensToVerify) {
       EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
@@ -262,9 +292,7 @@ public class EigenVerificationJob extend
         eigenVectors.add(slice.vector());
       }
       eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(),
-                                           eigenVectors.toArray(new Vector[eigenVectors.size()]),
-                                           true,
-                                           true);
+          eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true);
 
     } else {
       eigensToVerify = eigens;
@@ -281,16 +309,14 @@ public class EigenVerificationJob extend
 
   /**
    * Progammatic invocation of run()
-   * @param eigenInput Output of LanczosSolver
-   * @param corpusInput Input of LanczosSolver
+   *
+   * @param eigenInput
+   *          Output of LanczosSolver
+   * @param corpusInput
+   *          Input of LanczosSolver
    */
-  public void runJob(Configuration conf,
-                     Path eigenInput,
-                     Path corpusInput,
-                     Path output,
-                     boolean inMemory,
-                     double maxError,
-                     int maxEigens) throws IOException {
+  public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory,
+      double maxError, int maxEigens) throws IOException {
     // no need to handle command line arguments
     outPath = output;
     tmpOut = new Path(outPath, "tmp");
@@ -306,8 +332,8 @@ public class EigenVerificationJob extend
 
     eigenVerifier = new SimpleEigenVerifier();
 
-    Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens();
-    List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+    Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+    List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
     saveCleanEigens(conf, prunedEigenMeta);
   }
 }

Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java?rev=1497591&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java Thu Jun 27 22:32:02 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestEigenSeedGenerator extends MahoutTestCase {
+
+  private
+   static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0},
+                                  {0, 1, 0}, {0, 0, 1}, {0, 0, 1}};
+
+  private FileSystem fs;
+
+  private static List<VectorWritable> getPoints() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+
+  /** Story: test eigen seed generation generates 3 clusters with proper ids and data */
+  @Test
+  public void testEigenSeedGenerator() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("eigen-input");
+    Path output = getTestTempDirPath("eigen-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+    EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure());
+
+    int clusterCount = 0;
+    Collection<Integer> set = new HashSet<Integer>();
+    Vector v[] = new Vector[3];
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(
+             new Path(output, "part-eigenSeed"), true, conf)) {
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // validate unique id's
+      v[id] = cluster.getCenter();
+      clusterCount++;
+    }
+    assertEquals(3, clusterCount); // validate sample count
+    // validate pair-wise orthogonality
+    assertEquals(0, v[0].dot(v[1]), 1E-10);
+    assertEquals(0, v[1].dot(v[2]), 1E-10);
+    assertEquals(0, v[0].dot(v[2]), 1E-10);
+  }
+
+}