You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sq...@apache.org on 2013/03/11 20:06:59 UTC
svn commit: r1455286 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
Author: squinn
Date: Mon Mar 11 19:06:58 2013
New Revision: 1455286
URL: http://svn.apache.org/r1455286
Log:
Applying patch 1159 to add SSVD to SpectralKMeans. Also minor tweaks to Path structure.
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Mon Mar 11 19:06:58 2013
@@ -17,6 +17,10 @@
package org.apache.mahout.clustering.spectral.kmeans;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
@@ -37,55 +41,71 @@ import org.apache.mahout.math.decomposer
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
/**
* Implementation of the EigenCuts spectral clustering algorithm.
+ * This implementation is for testing and debugging.
+ *
+ * Using the variables below the user can:
+ * select to use either SSVDSolver or DistributedLanczosSolver for the Eigen decomposition.
+ * change the number of iterations in SSVD
+ * choose whether to keep the temp files that are created during a job
+ * have the output printed to a text file
+ *
+ * All of the steps involved in testing have timers built around them and the result is printed at
+ * the top of the output text file.
+ *
+ * See the README file for a description of the algorithm, testing results, and other details.
*/
public class SpectralKMeansDriver extends AbstractJob {
- public static final double OVERSHOOT_MULTIPLIER = 2.0;
+ public static final double OVERSHOOTMULTIPLIER = 2.0;
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new SpectralKMeansDriver(), args);
- }
-
- @Override
- public int run(String[] arg0) throws IOException, ClassNotFoundException, InterruptedException {
- // set up command line options
- Configuration conf = getConf();
- addInputOption();
- addOutputOption();
- addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
- addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
- addOption(DefaultOptionCreator.distanceMeasureOption().create());
- addOption(DefaultOptionCreator.convergenceOption().create());
- addOption(DefaultOptionCreator.maxIterationsOption().create());
- addOption(DefaultOptionCreator.overwriteOption().create());
- Map<String, List<String>> parsedArgs = parseArguments(arg0);
- if (parsedArgs == null) {
- return 0;
- }
-
- Path input = getInputPath();
- Path output = getOutputPath();
- if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- HadoopUtil.delete(conf, output);
- }
- int numDims = Integer.parseInt(getOption("dimensions"));
- int clusters = Integer.parseInt(getOption("clusters"));
- String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
- DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
- double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
- int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
-
- run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations);
-
- return 0;
- }
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SpectralKMeansDriver(), args);
+ }
+
+ @Override
+ public int run(String[] arg0)
+ throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
+
+ Configuration conf = getConf();
+ addInputOption();
+ addOutputOption();
+ addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+ addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.convergenceOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. Default is the Lanczos solver.");
+
+ Map<String, List<String>> parsedArgs = parseArguments(arg0);
+ if (parsedArgs == null) {
+ return 0;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(conf, output);
+ }
+ int numDims = Integer.parseInt(getOption("dimensions"));
+ int clusters = Integer.parseInt(getOption("clusters"));
+ String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+
+ Path tempdir = new Path(getOption("tempDir"));
+ boolean ssvd = parsedArgs.containsKey("--usessvd");
+
+ run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, ssvd);
+
+ return 0;
+ }
/**
* Run the Spectral KMeans clustering on the supplied arguments
@@ -98,95 +118,128 @@ public class SpectralKMeansDriver extend
* @param measure the DistanceMeasure for the k-Means calculations
* @param convergenceDelta the double convergence delta for the k-Means calculations
* @param maxIterations the int maximum number of iterations for the k-Means calculations
+ * @param tempDir Temporary directory for intermediate calculations
+ * @param ssvd Flag to indicate the eigensolver to use
*/
- public static void run(Configuration conf,
- Path input,
- Path output,
- int numDims,
- int clusters,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations)
- throws IOException, InterruptedException, ClassNotFoundException {
- // create a few new Paths for temp files and transformations
- Path outputCalc = new Path(output, "calculations");
- Path outputTmp = new Path(output, "temporary");
-
- // Take in the raw CSV text file and split it ourselves,
- // creating our own SequenceFiles for the matrices to read later
- // (similar to the style of syntheticcontrol.canopy.InputMapper)
- Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() & 0xFF));
- AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
-
- // Next step: construct the affinity matrix using the newly-created
- // sequence files
- DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
- new Path(outputTmp, "afftmp-" + (System.nanoTime() & 0xFF)),
- numDims,
- numDims);
- Configuration depConf = new Configuration(conf);
- A.setConf(depConf);
-
- // Next step: construct the diagonal matrix D (represented as a vector)
- // and calculate the normalized Laplacian of the form:
- // L = D^(-0.5)AD^(-0.5)
- Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
- DistributedRowMatrix L =
- VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
- new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)), new Path(outputCalc, "laplacian-tmp-" + (System.nanoTime() & 0xFF)));
- L.setConf(depConf);
-
- // Next step: perform eigen-decomposition using LanczosSolver
- // since some of the eigen-output is spurious and will be eliminated
- // upon verification, we have to aim to overshoot and then discard
- // unnecessary vectors later
- int overshoot = (int) ((double) clusters * OVERSHOOT_MULTIPLIER);
- DistributedLanczosSolver solver = new DistributedLanczosSolver();
- LanczosState state = new LanczosState(L, overshoot, DistributedLanczosSolver.getInitialVector(L));
- Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" + (System.nanoTime() & 0xFF));
- solver.runJob(conf,
- state,
- overshoot,
- true,
- lanczosSeqFiles.toString());
-
- // perform a verification
- EigenVerificationJob verifier = new EigenVerificationJob();
- Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
- verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, clusters);
- Path cleanedEigens = verifier.getCleanedEigensPath();
- DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
- W.setConf(depConf);
- DistributedRowMatrix Wtrans = W.transpose();
- // DistributedRowMatrix Wt = W.transpose();
-
- // next step: normalize the rows of Wt to unit length
- Path unitVectors = new Path(outputCalc, "unitvectors-" + (System.nanoTime() & 0xFF));
- UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
- DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
- Wt.setConf(depConf);
-
- // Finally, perform k-means clustering on the rows of L (or W)
- // generate random initial clusters
- Path initialclusters = RandomSeedGenerator.buildRandom(conf,
- Wt.getRowPath(),
- new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
- clusters,
- measure);
+ public static void run(
+ Configuration conf,
+ Path input,
+ Path output,
+ int numDims,
+ int clusters,
+ DistanceMeasure measure,
+ double convergenceDelta,
+ int maxIterations,
+ Path tempDir,
+ boolean ssvd)
+ throws IOException, InterruptedException, ClassNotFoundException {
- // The output format is the same as the K-means output format.
- // TODO: Perhaps a conversion of the output format from points and clusters
- // in eigenspace to the original dataset. Currently, the user has to perform
- // the association step after this job finishes on their own.
- KMeansDriver.run(conf,
- Wt.getRowPath(),
- initialclusters,
- output,
- measure,
- convergenceDelta,
- maxIterations,
- true,
- 0.0,
- false);
- }
+ Path outputCalc = new Path(tempDir, "calculations");
+ Path outputTmp = new Path(tempDir, "temporary");
+
+ // Take in the raw CSV text file and split it ourselves,
+ // creating our own SequenceFiles for the matrices to read later
+ // (similar to the style of syntheticcontrol.canopy.InputMapper)
+ Path affSeqFiles = new Path(outputCalc, "seqfile");
+ AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+
+ // Construct the affinity matrix using the newly-created sequence files
+ DistributedRowMatrix A =
+ new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
+
+ Configuration depConf = new Configuration(conf);
+ A.setConf(depConf);
+
+ // Construct the diagonal matrix D (represented as a vector)
+ Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+
+ //Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+ DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
+ new Path(outputCalc, "laplacian"), new Path(outputCalc, outputCalc));
+ L.setConf(depConf);
+
+ Path data;
+
+ if (ssvd) {
+ // SSVD requires an array of Paths to function. So we pass in an array of length one
+ Path [] LPath = new Path[1];
+ LPath[0] = L.getRowPath();
+
+ Path SSVDout = new Path(outputCalc, "SSVD");
+
+ SSVDSolver solveIt = new SSVDSolver(
+ depConf,
+ LPath,
+ SSVDout,
+ 1000, // Vertical height of a q-block
+ clusters,
+ 15, // Oversampling
+ 10);
+
+ solveIt.setComputeV(false);
+ solveIt.setComputeU(true);
+ solveIt.setOverwrite(true);
+ solveIt.setQ(0);
+
+ // May want to update SSVD documentation on this one: method doc
+ // says "false" is the default, yet it's set to true in the
+ // variable definition.
+ //solveIt.setBroadcast(false);
+ solveIt.run();
+ data = new Path(solveIt.getUPath());
+ } else {
+ // Perform eigen-decomposition using LanczosSolver
+ // since some of the eigen-output is spurious and will be eliminated
+ // upon verification, we have to aim to overshoot and then discard
+ // unnecessary vectors later
+ int overshoot = Math.min((int) ((double) clusters * OVERSHOOTMULTIPLIER), numDims);
+ DistributedLanczosSolver solver = new DistributedLanczosSolver();
+ LanczosState state = new LanczosState(L, overshoot, solver.getInitialVector(L));
+ Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors");
+
+ solver.runJob(conf,
+ state,
+ overshoot,
+ true,
+ lanczosSeqFiles.toString());
+
+ // perform a verification
+ EigenVerificationJob verifier = new EigenVerificationJob();
+ Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
+ verifier.runJob(conf,
+ lanczosSeqFiles,
+ L.getRowPath(),
+ verifiedEigensPath,
+ true,
+ 1.0,
+ clusters);
+
+ Path cleanedEigens = verifier.getCleanedEigensPath();
+ DistributedRowMatrix W = new DistributedRowMatrix(
+ cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+ W.setConf(depConf);
+ DistributedRowMatrix Wtrans = W.transpose();
+ data = Wtrans.getRowPath();
+ }
+
+ // Normalize the rows of Wt to unit length
+ // normalize is important because it reduces the occurrence of two unique clusters combining into one
+ Path unitVectors = new Path(outputCalc, "unitvectors");
+
+ UnitVectorizerJob.runJob(data, unitVectors);
+
+ DistributedRowMatrix Wt = new DistributedRowMatrix(
+ unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+ Wt.setConf(depConf);
+ data = Wt.getRowPath();
+
+ // Generate random initial clusters
+ Path initialclusters = RandomSeedGenerator.buildRandom(conf, data,
+ new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
+
+ // Run the KMeansDriver
+ Path answer = new Path(output, "kmeans_out");
+ KMeansDriver.run(conf, data, initialclusters, answer,
+ measure,convergenceDelta, maxIterations, true, 0.0, false);
+ }
}
Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java Mon Mar 11 19:06:58 2013
@@ -38,6 +38,7 @@ public class DisplaySpectralKMeans exten
protected static final String SAMPLES = "samples";
protected static final String OUTPUT = "output";
+ protected static final String TEMP = "tmp";
protected static final String AFFINITIES = "affinities";
DisplaySpectralKMeans() {
@@ -49,6 +50,7 @@ public class DisplaySpectralKMeans exten
DistanceMeasure measure = new ManhattanDistanceMeasure();
Path samples = new Path(SAMPLES);
Path output = new Path(OUTPUT);
+ Path tempDir = new Path(TEMP);
Configuration conf = new Configuration();
HadoopUtil.delete(conf, samples);
HadoopUtil.delete(conf, output);
@@ -73,7 +75,7 @@ public class DisplaySpectralKMeans exten
}
int maxIter = 10;
double convergenceDelta = 0.001;
- SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter);
+ SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter, tempDir, false);
new DisplaySpectralKMeans();
}