You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2013/06/28 00:32:02 UTC
svn commit: r1497591 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/clustering/kmeans/
main/java/org/apache/mahout/clustering/spectral/kmeans/
main/java/org/apache/mahout/math/hadoop/decomposer/
test/java/org/apache/mahout/clustering/kmeans/
Author: robinanil
Date: Thu Jun 27 22:32:02 2013
New Revision: 1497591
URL: http://svn.apache.org/r1497591
Log:
MAHOUT-1214 Improve the accuracy of the Spectral KMeans Method(Yiqun Hu, Zhang Da)
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java?rev=1497591&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/EigenSeedGenerator.java Thu Jun 27 22:32:02 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+/**
+ * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, select k vectors and write them to the
+ * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} representing the initial centroid to use. The
+ * selection criterion is the rows with max value in that respective column
+ */
+public final class EigenSeedGenerator {
+
+ private static final Logger log = LoggerFactory.getLogger(EigenSeedGenerator.class);
+
+ public static final String K = "k";
+
+ private EigenSeedGenerator() {}
+
+ public static Path buildFromEigens(Configuration conf, Path input, Path output, int k, DistanceMeasure measure)
+ throws IOException {
+ // delete the output directory
+ FileSystem fs = FileSystem.get(output.toUri(), conf);
+ HadoopUtil.delete(conf, output);
+ Path outFile = new Path(output, "part-eigenSeed");
+ boolean newFile = fs.createNewFile(outFile);
+ if (newFile) {
+ Path inputPathPattern;
+
+ if (fs.getFileStatus(input).isDir()) {
+ inputPathPattern = new Path(input, "*");
+ } else {
+ inputPathPattern = input;
+ }
+
+ FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class);
+ Map<Integer,Double> maxEigens = Maps.newHashMapWithExpectedSize(k); // store
+ // max
+ // value
+ // of
+ // each
+ // column
+ Map<Integer,Text> chosenTexts = Maps.newHashMapWithExpectedSize(k);
+ Map<Integer,ClusterWritable> chosenClusters = Maps.newHashMapWithExpectedSize(k);
+
+ for (FileStatus fileStatus : inputFiles) {
+ if (fileStatus.isDir()) {
+ continue;
+ }
+ for (Pair<Writable,VectorWritable> record : new SequenceFileIterable<Writable,VectorWritable>(
+ fileStatus.getPath(), true, conf)) {
+ Writable key = record.getFirst();
+ VectorWritable value = record.getSecond();
+ Iterator<Vector.Element> nonZeroElements = value.get().nonZeroes().iterator();
+
+ while (nonZeroElements.hasNext()) {
+ Vector.Element e = nonZeroElements.next();
+ int index = e.index();
+ double v = Math.abs(e.get());
+
+ if (!maxEigens.containsKey(index) || v > maxEigens.get(index)) {
+ maxEigens.put(index, v);
+ Text newText = new Text(key.toString());
+ chosenTexts.put(index, newText);
+ Kluster newCluster = new Kluster(value.get(), index, measure);
+ newCluster.observe(value.get(), 1);
+ ClusterWritable clusterWritable = new ClusterWritable();
+ clusterWritable.setValue(newCluster);
+ chosenClusters.put(index, clusterWritable);
+ }
+ }
+ }
+ }
+
+ try {
+ Iterator<Integer> iter = maxEigens.keySet().iterator();
+ while (iter.hasNext()) {
+ int key = iter.next();
+ writer.append(chosenTexts.get(key), chosenClusters.get(key));
+ }
+ log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", chosenTexts.size(), outFile);
+ } finally {
+ Closeables.close(writer, false);
+ }
+ }
+
+ return outFile;
+ }
+
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1497591&r1=1497590&r2=1497591&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Thu Jun 27 22:32:02 2013
@@ -22,11 +22,16 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.EigenSeedGenerator;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
-import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
import org.apache.mahout.clustering.spectral.common.UnitVectorizerJob;
@@ -34,21 +39,26 @@ import org.apache.mahout.clustering.spec
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.decomposer.lanczos.LanczosState;
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
/**
- * Performs spectral k-means clustering on the top k eigenvectors of the input
- * affinity matrix.
+ * Performs spectral k-means clustering on the top k eigenvectors of the input affinity matrix.
*/
public class SpectralKMeansDriver extends AbstractJob {
+ private static final Logger log = LoggerFactory.getLogger(SpectralKMeansDriver.class);
public static final double OVERSHOOTMULTIPLIER = 2.0;
public static final int REDUCERS = 10;
@@ -78,7 +88,7 @@ public class SpectralKMeansDriver extend
addOption("oversampling", "p", "Oversampling parameter for SSVD", String.valueOf(OVERSAMPLING));
addOption("powerIter", "q", "Additional power iterations for SSVD", String.valueOf(POWERITERS));
- Map<String, List<String>> parsedArgs = parseArguments(arg0);
+ Map<String,List<String>> parsedArgs = parseArguments(arg0);
if (parsedArgs == null) {
return 0;
}
@@ -111,55 +121,45 @@ public class SpectralKMeansDriver extend
return 0;
}
- public static void run(
- Configuration conf,
- Path input,
- Path output,
- int numDims,
- int clusters,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations,
- Path tempDir,
- boolean ssvd) throws IOException, InterruptedException, ClassNotFoundException {
+ public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+ DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd)
+ throws IOException, InterruptedException, ClassNotFoundException {
run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, ssvd, REDUCERS,
BLOCKHEIGHT, OVERSAMPLING, POWERITERS);
}
/**
* Run the Spectral KMeans clustering on the supplied arguments
- *
- * @param conf the Configuration to be used
- * @param input the Path to the input tuples directory
- * @param output the Path to the output directory
- * @param numDims the int number of dimensions of the affinity matrix
- * @param clusters the int number of eigenvectors and thus clusters to produce
- * @param measure the DistanceMeasure for the k-Means calculations
- * @param convergenceDelta the double convergence delta for the k-Means calculations
- * @param maxIterations the int maximum number of iterations for the k-Means calculations
- * @param tempDir Temporary directory for intermediate calculations
- * @param ssvd Flag to indicate the eigensolver to use
+ *
+ * @param conf
+ * the Configuration to be used
+ * @param input
+ * the Path to the input tuples directory
+ * @param output
+ * the Path to the output directory
+ * @param numDims
+ * the int number of dimensions of the affinity matrix
+ * @param clusters
+ * the int number of eigenvectors and thus clusters to produce
+ * @param measure
+ * the DistanceMeasure for the k-Means calculations
+ * @param convergenceDelta
+ * the double convergence delta for the k-Means calculations
+ * @param maxIterations
+ * the int maximum number of iterations for the k-Means calculations
+ * @param tempDir
+ * Temporary directory for intermediate calculations
+ * @param ssvd
+ * Flag to indicate the eigensolver to use
* @param numReducers
* @param blockHeight
* @param oversampling
* @param poweriters
*/
- public static void run(
- Configuration conf,
- Path input,
- Path output,
- int numDims,
- int clusters,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations,
- Path tempDir,
- boolean ssvd,
- int numReducers,
- int blockHeight,
- int oversampling,
- int poweriters)
- throws IOException, InterruptedException, ClassNotFoundException {
+ public static void run(Configuration conf, Path input, Path output, int numDims, int clusters,
+ DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd, int numReducers,
+ int blockHeight, int oversampling, int poweriters) throws IOException, InterruptedException,
+ ClassNotFoundException {
Path outputCalc = new Path(tempDir, "calculations");
Path outputTmp = new Path(tempDir, "temporary");
@@ -171,8 +171,7 @@ public class SpectralKMeansDriver extend
AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
// Construct the affinity matrix using the newly-created sequence files
- DistributedRowMatrix A =
- new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
+ DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims);
Configuration depConf = new Configuration(conf);
A.setConf(depConf);
@@ -180,34 +179,27 @@ public class SpectralKMeansDriver extend
// Construct the diagonal matrix D (represented as a vector)
Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
- //Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
- DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
- new Path(outputCalc, "laplacian"), new Path(outputCalc, outputCalc));
+ // Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+ DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian"),
+ new Path(outputCalc, outputCalc));
L.setConf(depConf);
Path data;
if (ssvd) {
// SSVD requires an array of Paths to function. So we pass in an array of length one
- Path [] LPath = new Path[1];
+ Path[] LPath = new Path[1];
LPath[0] = L.getRowPath();
Path SSVDout = new Path(outputCalc, "SSVD");
- SSVDSolver solveIt = new SSVDSolver(
- depConf,
- LPath,
- SSVDout,
- blockHeight,
- clusters,
- oversampling,
- numReducers);
+ SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers);
solveIt.setComputeV(false);
solveIt.setComputeU(true);
solveIt.setOverwrite(true);
solveIt.setQ(poweriters);
- //solveIt.setBroadcast(false);
+ // solveIt.setBroadcast(false);
solveIt.run();
data = new Path(solveIt.getUPath());
} else {
@@ -220,48 +212,67 @@ public class SpectralKMeansDriver extend
LanczosState state = new LanczosState(L, overshoot, DistributedLanczosSolver.getInitialVector(L));
Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors");
- solver.runJob(conf,
- state,
- overshoot,
- true,
- lanczosSeqFiles.toString());
+ solver.runJob(conf, state, overshoot, true, lanczosSeqFiles.toString());
// perform a verification
EigenVerificationJob verifier = new EigenVerificationJob();
Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
- verifier.runJob(conf,
- lanczosSeqFiles,
- L.getRowPath(),
- verifiedEigensPath,
- true,
- 1.0,
- clusters);
+ verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, clusters);
Path cleanedEigens = verifier.getCleanedEigensPath();
- DistributedRowMatrix W = new DistributedRowMatrix(
- cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+ DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters,
+ numDims);
W.setConf(depConf);
DistributedRowMatrix Wtrans = W.transpose();
data = Wtrans.getRowPath();
}
// Normalize the rows of Wt to unit length
- // normalize is important because it reduces the occurrence of two unique clusters combining into one
+ // normalize is important because it reduces the occurrence of two unique clusters combining into one
Path unitVectors = new Path(outputCalc, "unitvectors");
UnitVectorizerJob.runJob(data, unitVectors);
- DistributedRowMatrix Wt = new DistributedRowMatrix(
- unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+ DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
Wt.setConf(depConf);
data = Wt.getRowPath();
- // Generate random initial clusters
- Path initialclusters = RandomSeedGenerator.buildRandom(conf, data,
+ // Generate initial clusters using EigenSeedGenerator which picks rows as centroids if that row contains max
+ // eigen value in that column
+ Path initialclusters = EigenSeedGenerator.buildFromEigens(conf, data,
new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
- // Run the KMeansDriver
+ // Run the KMeansDriver
Path answer = new Path(output, "kmeans_out");
- KMeansDriver.run(conf, data, initialclusters, answer, measure,convergenceDelta, maxIterations, true, 0.0, false);
+ KMeansDriver.run(conf, data, initialclusters, answer, measure, convergenceDelta, maxIterations, true, 0.0, false);
+
+ // Restore name to id mapping and read through the cluster assignments
+ Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), "generic_input_mapping");
+ List<String> mapping = Lists.newArrayList();
+ FileSystem fs = FileSystem.get(mappingPath.toUri(), conf);
+ if (fs.exists(mappingPath)) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, conf);
+ Text mappingValue = new Text();
+ IntWritable mappingIndex = new IntWritable();
+ while (reader.next(mappingIndex, mappingValue)) {
+ String s = new String(mappingValue.toString());
+ mapping.add(s);
+ }
+ HadoopUtil.delete(conf, mappingPath);
+ } else {
+ log.warn("generic input mapping file not found!");
+ }
+
+ Path clusteredPointsPath = new Path(answer, "clusteredPoints");
+ Path inputPath = new Path(clusteredPointsPath, "part-m-00000");
+ int id = 0;
+ for (Pair<IntWritable, WeightedVectorWritable> record :
+ new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) {
+ if (!mapping.isEmpty()) {
+ log.info("{}: {}", mapping.get(id++), record.getFirst().get());
+ } else {
+ log.info("{}: {}", id++, record.getFirst().get());
+ }
+ }
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=1497591&r1=1497590&r2=1497591&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Thu Jun 27 22:32:02 2013
@@ -49,19 +49,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * <p>Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness,
- * in terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error
- * w.r.t. eigenvector-ness is the cosine of the angle between e and e':</p>
+ * <p>
+ * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in
+ * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t.
+ * eigenvector-ness is the cosine of the angle between e and e':
+ * </p>
+ *
* <pre>
* error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2))
* </pre>
- * <p>A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products
- * between eigenvectors, and checks that this is close to the identity matrix.
+ * <p>
+ * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between
+ * eigenvectors, and checks that this is close to the identity matrix.
* </p>
* <p>
- * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which
- * specifies the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies
- * the maximum error (as defined above) to be tolerated in an eigenvector.</p>
+ * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies
+ * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum
+ * error (as defined above) to be tolerated in an eigenvector.
+ * </p>
* <p>
* If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so.
* </p>
@@ -83,7 +88,7 @@ public class EigenVerificationJob extend
private double minEigenValue;
- //private boolean loadEigensInMemory;
+ // private boolean loadEigensInMemory;
private Path tmpOut;
@@ -99,7 +104,7 @@ public class EigenVerificationJob extend
@Override
public int run(String[] args) throws Exception {
- Map<String, List<String>> argMap = handleArgs(args);
+ Map<String,List<String>> argMap = handleArgs(args);
if (argMap == null) {
return -1;
}
@@ -107,37 +112,36 @@ public class EigenVerificationJob extend
return 0;
}
// parse out the arguments
- runJob(getConf(),
- new Path(getOption("eigenInput")),
- new Path(getOption("corpusInput")),
- getOutputPath(),
- getOption("inMemory") != null,
- Double.parseDouble(getOption("maxError")),
- //Double.parseDouble(getOption("minEigenvalue")),
- Integer.parseInt(getOption("maxEigens")));
+ runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(),
+ getOption("inMemory") != null, Double.parseDouble(getOption("maxError")),
+ // Double.parseDouble(getOption("minEigenvalue")),
+ Integer.parseInt(getOption("maxEigens")));
return 0;
}
/**
* Run the job with the given arguments
- * @param corpusInput the corpus input Path
- * @param eigenInput the eigenvector input Path
- * @param output the output Path
- * @param tempOut temporary output Path
- * @param maxError a double representing the maximum error
- * @param minEigenValue a double representing the minimum eigenvalue
- * @param inMemory a boolean requesting in-memory preparation
- * @param conf the Configuration to use, or null if a default is ok
- * (saves referencing Configuration in calling classes unless needed)
+ *
+ * @param corpusInput
+ * the corpus input Path
+ * @param eigenInput
+ * the eigenvector input Path
+ * @param output
+ * the output Path
+ * @param tempOut
+ * temporary output Path
+ * @param maxError
+ * a double representing the maximum error
+ * @param minEigenValue
+ * a double representing the minimum eigenvalue
+ * @param inMemory
+ * a boolean requesting in-memory preparation
+ * @param conf
+ * the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes
+ * unless needed)
*/
- public int run(Path corpusInput,
- Path eigenInput,
- Path output,
- Path tempOut,
- double maxError,
- double minEigenValue,
- boolean inMemory,
- Configuration conf) throws IOException {
+ public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue,
+ boolean inMemory, Configuration conf) throws IOException {
this.outPath = output;
this.tmpOut = tempOut;
this.maxError = maxError;
@@ -157,20 +161,18 @@ public class EigenVerificationJob extend
// we don't currently verify orthonormality here.
// VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts();
- Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens();
+ Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
- List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
saveCleanEigens(new Configuration(), prunedEigenMeta);
return 0;
}
- private Map<String, List<String>> handleArgs(String[] args) throws IOException {
+ private Map<String,List<String>> handleArgs(String[] args) throws IOException {
addOutputOption();
- addOption("eigenInput",
- "ei",
- "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.",
- null);
+ addOption("eigenInput", "ei",
+ "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null);
addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>.");
addOption(DefaultOptionCreator.outputOption().create());
addOption(DefaultOptionCreator.helpOption());
@@ -182,24 +184,22 @@ public class EigenVerificationJob extend
return parseArguments(args);
}
- private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta)
- throws IOException {
+ private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta)
+ throws IOException {
Path path = new Path(outPath, CLEAN_EIGENVECTORS);
FileSystem fs = FileSystem.get(path.toUri(), conf);
SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
try {
IntWritable iw = new IntWritable();
int numEigensWritten = 0;
- for (Map.Entry<MatrixSlice, EigenStatus> pruneSlice : prunedEigenMeta) {
+ int index = 0;
+ for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) {
MatrixSlice s = pruneSlice.getKey();
EigenStatus meta = pruneSlice.getValue();
- EigenVector ev = new EigenVector(s.vector(),
- meta.getEigenValue(),
- Math.abs(1 - meta.getCosAngle()),
- s.index());
- //log.info("appending {} to {}", ev, path);
+ EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index());
+ // log.info("appending {} to {}", ev, path);
Writable vw = new VectorWritable(ev);
- iw.set(s.index());
+ iw.set(index++);
seqWriter.append(iw, vw);
// increment the number of eigenvectors written and see if we've
@@ -217,34 +217,64 @@ public class EigenVerificationJob extend
cleanedEigensPath = path;
}
- private List<Map.Entry<MatrixSlice, EigenStatus>> pruneEigens(Map<MatrixSlice, EigenStatus> eigenMetaData) {
- List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = Lists.newArrayList();
+ private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) {
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList();
- for (Map.Entry<MatrixSlice, EigenStatus> entry : eigenMetaData.entrySet()) {
+ for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) {
if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) {
prunedEigenMeta.add(entry);
}
}
- Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice, EigenStatus>>() {
+ Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() {
@Override
public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) {
- int index1 = e1.getKey().index();
- int index2 = e2.getKey().index();
- if (index1 < index2) {
- return -1;
- }
- if (index1 > index2) {
+ // sort eigens on eigenvalues in descending order
+ double eg1 = e1.getValue().getEigenValue();
+ double eg2 = e2.getValue().getEigenValue();
+ if (eg1 < eg2) {
return 1;
}
+ if (eg1 > eg2) {
+ return -1;
+ }
return 0;
}
});
- return prunedEigenMeta;
+
+ // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones
+ List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList();
+ Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0);
+ selectedEigenMeta.add(e1);
+ int selectedEigenMetaLength = selectedEigenMeta.size();
+ int prunedEigenMetaLength = prunedEigenMeta.size();
+
+ while (prunedEigenMetaLength > 0) {
+ double sum = Double.MAX_VALUE;
+ int index = 0;
+ for (int i = 0; i < prunedEigenMetaLength; i++) {
+ Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i);
+ double tmp = 0;
+ for (int j = 0; j < selectedEigenMetaLength; j++) {
+ Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j);
+ tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2);
+ }
+ if (tmp < sum) {
+ sum = tmp;
+ index = i;
+ }
+ }
+ Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index);
+ selectedEigenMeta.add(e);
+ selectedEigenMetaLength++;
+ prunedEigenMetaLength--;
+ }
+
+ return selectedEigenMeta;
}
- private Map<MatrixSlice, EigenStatus> verifyEigens() {
- Map<MatrixSlice, EigenStatus> eigenMetaData = Maps.newHashMap();
+ private Map<MatrixSlice,EigenStatus> verifyEigens() {
+ Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap();
for (MatrixSlice slice : eigensToVerify) {
EigenStatus status = eigenVerifier.verify(corpus, slice.vector());
@@ -262,9 +292,7 @@ public class EigenVerificationJob extend
eigenVectors.add(slice.vector());
}
eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(),
- eigenVectors.toArray(new Vector[eigenVectors.size()]),
- true,
- true);
+ eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true);
} else {
eigensToVerify = eigens;
@@ -281,16 +309,14 @@ public class EigenVerificationJob extend
/**
* Progammatic invocation of run()
- * @param eigenInput Output of LanczosSolver
- * @param corpusInput Input of LanczosSolver
+ *
+ * @param eigenInput
+ * Output of LanczosSolver
+ * @param corpusInput
+ * Input of LanczosSolver
*/
- public void runJob(Configuration conf,
- Path eigenInput,
- Path corpusInput,
- Path output,
- boolean inMemory,
- double maxError,
- int maxEigens) throws IOException {
+ public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory,
+ double maxError, int maxEigens) throws IOException {
// no need to handle command line arguments
outPath = output;
tmpOut = new Path(outPath, "tmp");
@@ -306,8 +332,8 @@ public class EigenVerificationJob extend
eigenVerifier = new SimpleEigenVerifier();
- Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens();
- List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
+ Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens();
+ List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData);
saveCleanEigens(conf, prunedEigenMeta);
}
}
Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java?rev=1497591&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestEigenSeedGenerator.java Thu Jun 27 22:32:02 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestEigenSeedGenerator extends MahoutTestCase {
+
+ private
+ static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0},
+ {0, 1, 0}, {0, 0, 1}, {0, 0, 1}};
+
+ private FileSystem fs;
+
+ private static List<VectorWritable> getPoints() {
+ List<VectorWritable> points = Lists.newArrayList();
+ for (double[] fr : RAW) {
+ Vector vec = new RandomAccessSparseVector(fr.length);
+ vec.assign(fr);
+ points.add(new VectorWritable(vec));
+ }
+ return points;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = new Configuration();
+ fs = FileSystem.get(conf);
+ }
+
+ /** Story: test eigen seed generation generates 3 clusters with proper ids and data */
+ @Test
+ public void testEigenSeedGenerator() throws Exception {
+ List<VectorWritable> points = getPoints();
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ job.setMapOutputValueClass(VectorWritable.class);
+ Path input = getTestTempFilePath("eigen-input");
+ Path output = getTestTempDirPath("eigen-output");
+ ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+ EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure());
+
+ int clusterCount = 0;
+ Collection<Integer> set = new HashSet<Integer>();
+ Vector v[] = new Vector[3];
+ for (ClusterWritable clusterWritable :
+ new SequenceFileValueIterable<ClusterWritable>(
+ new Path(output, "part-eigenSeed"), true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ int id = cluster.getId();
+ assertTrue(set.add(id)); // validate unique id's
+ v[id] = cluster.getCenter();
+ clusterCount++;
+ }
+ assertEquals(3, clusterCount); // validate sample count
+ // validate pair-wise orthogonality
+ assertEquals(0, v[0].dot(v[1]), 1E-10);
+ assertEquals(0, v[1].dot(v[2]), 1E-10);
+ assertEquals(0, v[0].dot(v[2]), 1E-10);
+ }
+
+}