You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sq...@apache.org on 2013/03/11 20:06:59 UTC

svn commit: r1455286 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java

Author: squinn
Date: Mon Mar 11 19:06:58 2013
New Revision: 1455286

URL: http://svn.apache.org/r1455286
Log:
Applying patch 1159 to add SSVD to SpectralKMeans. Also minor tweaks to Path structure.

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Mon Mar 11 19:06:58 2013
@@ -17,6 +17,10 @@
 
 package org.apache.mahout.clustering.spectral.kmeans;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
@@ -37,55 +41,71 @@ import org.apache.mahout.math.decomposer
 import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
 import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 /**
  * Implementation of the EigenCuts spectral clustering algorithm.
+ * This implementation is for testing and debugging. 
+ * 
+ * Using the variables below the user can:
+ * 		select to use either SSVDSolver or DistributedLanczosSolver for the Eigen decomposition. 
+ * 		change the number of iterations in SSVD
+ * 		choose whether to keep the temp files that are created during a job
+ * 		have the output printed to a text file 
+ * 
+ * All of the steps involved in testing have timers built around them and the result is printed at
+ * the top of the output text file. 
+ * 
+ * See the README file for a description of the algorithm, testing results, and other details.
  */
 public class SpectralKMeansDriver extends AbstractJob {
 
-  public static final double OVERSHOOT_MULTIPLIER = 2.0;
+	public static final double OVERSHOOTMULTIPLIER = 2.0;
 
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new SpectralKMeansDriver(), args);
-  }
-
-  @Override
-  public int run(String[] arg0) throws IOException, ClassNotFoundException, InterruptedException {
-    // set up command line options
-    Configuration conf = getConf();
-    addInputOption();
-    addOutputOption();
-    addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
-    addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
-    addOption(DefaultOptionCreator.distanceMeasureOption().create());
-    addOption(DefaultOptionCreator.convergenceOption().create());
-    addOption(DefaultOptionCreator.maxIterationsOption().create());
-    addOption(DefaultOptionCreator.overwriteOption().create());
-    Map<String, List<String>> parsedArgs = parseArguments(arg0);
-    if (parsedArgs == null) {
-      return 0;
-    }
-
-    Path input = getInputPath();
-    Path output = getOutputPath();
-    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
-      HadoopUtil.delete(conf, output);
-    }
-    int numDims = Integer.parseInt(getOption("dimensions"));
-    int clusters = Integer.parseInt(getOption("clusters"));
-    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
-    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
-    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
-    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
-
-    run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations);
-
-    return 0;
-  }
+	public static void main(String[] args) throws Exception {
+		ToolRunner.run(new SpectralKMeansDriver(), args);
+	}
+  
+	@Override
+	public int run(String[] arg0)
+			throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
+    
+		Configuration conf = getConf();
+		addInputOption();
+		addOutputOption();
+		addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+		addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
+		addOption(DefaultOptionCreator.distanceMeasureOption().create());
+		addOption(DefaultOptionCreator.convergenceOption().create());
+		addOption(DefaultOptionCreator.maxIterationsOption().create());
+		addOption(DefaultOptionCreator.overwriteOption().create());
+		addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. Default is the Lanczos solver.");
+		
+		Map<String, List<String>> parsedArgs = parseArguments(arg0);
+		if (parsedArgs == null) {
+		  return 0;
+		}
+		
+		Path input = getInputPath();
+		Path output = getOutputPath();
+		if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+			HadoopUtil.delete(conf, output);
+		}
+		int numDims = Integer.parseInt(getOption("dimensions"));
+		int clusters = Integer.parseInt(getOption("clusters"));
+		String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+		DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+		double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+		int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+		
+		Path tempdir = new Path(getOption("tempDir"));
+		boolean ssvd = parsedArgs.containsKey("--usessvd");
+		
+		run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, ssvd);
+		
+		return 0;
+	}
 
   /**
    * Run the Spectral KMeans clustering on the supplied arguments
@@ -98,95 +118,128 @@ public class SpectralKMeansDriver extend
    * @param measure the DistanceMeasure for the k-Means calculations
    * @param convergenceDelta the double convergence delta for the k-Means calculations
    * @param maxIterations the int maximum number of iterations for the k-Means calculations
+   * @param tempDir Temporary directory for intermediate calculations
+   * @param ssvd Flag to indicate the eigensolver to use
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         int numDims,
-                         int clusters,
-                         DistanceMeasure measure,
-                         double convergenceDelta,
-                         int maxIterations)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    // create a few new Paths for temp files and transformations
-    Path outputCalc = new Path(output, "calculations");
-    Path outputTmp = new Path(output, "temporary");
-
-    // Take in the raw CSV text file and split it ourselves,
-    // creating our own SequenceFiles for the matrices to read later 
-    // (similar to the style of syntheticcontrol.canopy.InputMapper)
-    Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() & 0xFF));
-    AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
-
-    // Next step: construct the affinity matrix using the newly-created
-    // sequence files
-    DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
-                                                      new Path(outputTmp, "afftmp-" + (System.nanoTime() & 0xFF)),
-                                                      numDims,
-                                                      numDims);
-    Configuration depConf = new Configuration(conf);
-    A.setConf(depConf);
-
-    // Next step: construct the diagonal matrix D (represented as a vector)
-    // and calculate the normalized Laplacian of the form:
-    // L = D^(-0.5)AD^(-0.5)
-    Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
-    DistributedRowMatrix L =
-        VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
-            new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)), new Path(outputCalc, "laplacian-tmp-" + (System.nanoTime() & 0xFF)));
-    L.setConf(depConf);
-
-    // Next step: perform eigen-decomposition using LanczosSolver
-    // since some of the eigen-output is spurious and will be eliminated
-    // upon verification, we have to aim to overshoot and then discard
-    // unnecessary vectors later
-    int overshoot = (int) ((double) clusters * OVERSHOOT_MULTIPLIER);
-    DistributedLanczosSolver solver = new DistributedLanczosSolver();
-    LanczosState state = new LanczosState(L, overshoot, DistributedLanczosSolver.getInitialVector(L));
-    Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" + (System.nanoTime() & 0xFF));
-    solver.runJob(conf,
-                  state,
-                  overshoot,
-                  true,
-                  lanczosSeqFiles.toString());
-
-    // perform a verification
-    EigenVerificationJob verifier = new EigenVerificationJob();
-    Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
-    verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, clusters);
-    Path cleanedEigens = verifier.getCleanedEigensPath();
-    DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
-    W.setConf(depConf);
-    DistributedRowMatrix Wtrans = W.transpose();
-    //    DistributedRowMatrix Wt = W.transpose();
-
-    // next step: normalize the rows of Wt to unit length
-    Path unitVectors = new Path(outputCalc, "unitvectors-" + (System.nanoTime() & 0xFF));
-    UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
-    DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
-    Wt.setConf(depConf);
-
-    // Finally, perform k-means clustering on the rows of L (or W)
-    // generate random initial clusters
-    Path initialclusters = RandomSeedGenerator.buildRandom(conf,
-                                                           Wt.getRowPath(),
-                                                           new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
-                                                           clusters,
-                                                           measure);
+	public static void run(
+		  Configuration conf,
+		  Path input,
+		  Path output,
+		  int numDims,
+		  int clusters,
+		  DistanceMeasure measure,
+		  double convergenceDelta,
+		  int maxIterations,
+		  Path tempDir,
+		  boolean ssvd)
+				  throws IOException, InterruptedException, ClassNotFoundException {
     
-    // The output format is the same as the K-means output format.
-    // TODO: Perhaps a conversion of the output format from points and clusters
-    // in eigenspace to the original dataset. Currently, the user has to perform
-    // the association step after this job finishes on their own.
-    KMeansDriver.run(conf,
-                     Wt.getRowPath(),
-                     initialclusters,
-                     output,
-                     measure,
-                     convergenceDelta,
-                     maxIterations,
-                     true,
-                     0.0, 
-                     false);
-  }
+		Path outputCalc = new Path(tempDir, "calculations");
+		Path outputTmp = new Path(tempDir, "temporary");
+
+		// Take in the raw CSV text file and split it ourselves,
+		// creating our own SequenceFiles for the matrices to read later 
+		// (similar to the style of syntheticcontrol.canopy.InputMapper)
+		Path affSeqFiles = new Path(outputCalc, "seqfile");
+		AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+		
+		// Construct the affinity matrix using the newly-created sequence files
+		DistributedRowMatrix A = 
+				new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims); 
+		
+		Configuration depConf = new Configuration(conf);
+		A.setConf(depConf);
+		
+		// Construct the diagonal matrix D (represented as a vector)
+		Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+		
+		//Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+		DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
+				new Path(outputCalc, "laplacian"), new Path(outputCalc, outputCalc));
+		L.setConf(depConf);
+		
+		Path data;
+		
+		if (ssvd) {
+			// SSVD requires an array of Paths to function. So we pass in an array of length one
+			Path [] LPath = new Path[1];
+			LPath[0] = L.getRowPath();
+			
+			Path SSVDout = new Path(outputCalc, "SSVD");
+			
+			SSVDSolver solveIt = new SSVDSolver(
+					depConf, 
+					LPath, 
+					SSVDout, 
+					1000, // Vertical height of a q-block
+					clusters, 
+					15, // Oversampling 
+					10);
+			
+			solveIt.setComputeV(false); 
+			solveIt.setComputeU(true);
+			solveIt.setOverwrite(true);
+			solveIt.setQ(0);
+			
+			// May want to update SSVD documentation on this one: method doc
+			// says "false" is the default, yet it's set to true in the 
+			// variable definition.
+			//solveIt.setBroadcast(false);
+			solveIt.run();
+			data = new Path(solveIt.getUPath());
+		} else {
+			// Perform eigen-decomposition using LanczosSolver
+			// since some of the eigen-output is spurious and will be eliminated
+			// upon verification, we have to aim to overshoot and then discard
+			// unnecessary vectors later
+			int overshoot = Math.min((int) ((double) clusters * OVERSHOOTMULTIPLIER), numDims);
+			DistributedLanczosSolver solver = new DistributedLanczosSolver();
+			LanczosState state = new LanczosState(L, overshoot, solver.getInitialVector(L));
+			Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors");
+			
+			solver.runJob(conf,
+			              state,
+			              overshoot,
+			              true,
+			              lanczosSeqFiles.toString());
+			
+			// perform a verification
+			EigenVerificationJob verifier = new EigenVerificationJob();
+			Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
+			verifier.runJob(conf, 
+							lanczosSeqFiles, 
+							L.getRowPath(), 
+							verifiedEigensPath, 
+							true, 
+							1.0, 
+							clusters);
+			
+			Path cleanedEigens = verifier.getCleanedEigensPath();
+			DistributedRowMatrix W = new DistributedRowMatrix(
+					cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+			W.setConf(depConf);
+			DistributedRowMatrix Wtrans = W.transpose();
+			data = Wtrans.getRowPath();
+		}
+		
+		// Normalize the rows of Wt to unit length
+		// normalize is important because it reduces the occurrence of two unique clusters  combining into one 
+		Path unitVectors = new Path(outputCalc, "unitvectors");
+		
+		UnitVectorizerJob.runJob(data, unitVectors);
+		
+		DistributedRowMatrix Wt = new DistributedRowMatrix(
+				unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+		Wt.setConf(depConf);
+		data = Wt.getRowPath();
+		
+		// Generate random initial clusters
+		Path initialclusters = RandomSeedGenerator.buildRandom(conf, data,
+				new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
+		   
+		    // Run the KMeansDriver
+		Path answer = new Path(output, "kmeans_out");
+		KMeansDriver.run(conf, data, initialclusters, answer,
+				measure,convergenceDelta, maxIterations, true, 0.0, false);
+    }
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java Mon Mar 11 19:06:58 2013
@@ -38,6 +38,7 @@ public class DisplaySpectralKMeans exten
 
 	protected static final String SAMPLES = "samples";
 	protected static final String OUTPUT = "output";
+	protected static final String TEMP = "tmp";
 	protected static final String AFFINITIES = "affinities";
 	
   DisplaySpectralKMeans() {
@@ -49,6 +50,7 @@ public class DisplaySpectralKMeans exten
     DistanceMeasure measure = new ManhattanDistanceMeasure();
     Path samples = new Path(SAMPLES);
     Path output = new Path(OUTPUT);
+    Path tempDir = new Path(TEMP);
     Configuration conf = new Configuration();
     HadoopUtil.delete(conf, samples);
     HadoopUtil.delete(conf, output);
@@ -73,7 +75,7 @@ public class DisplaySpectralKMeans exten
     }
     int maxIter = 10;
     double convergenceDelta = 0.001;
-    SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter);
+    SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter, tempDir, false);
     new DisplaySpectralKMeans();
   }