You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/09/30 20:00:24 UTC
svn commit: r1003188 [1/2] - in /mahout/trunk/core/src:
main/java/org/apache/mahout/clustering/spectral/
main/java/org/apache/mahout/clustering/spectral/common/
main/java/org/apache/mahout/clustering/spectral/eigencuts/
main/java/org/apache/mahout/clus...
Author: jeastman
Date: Thu Sep 30 18:00:23 2010
New Revision: 1003188
URL: http://svn.apache.org/viewvc?rev=1003188&view=rev
Log:
MAHOUT-363: committing latest patch from the JIRA
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,76 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+public final class AffinityMatrixInputJob {
+
+ /**
+ * Initializes and executes the job of reading the documents containing
+ * the data of the affinity matrix in (x_i, x_j, value) format.
+ *
+ * @param input
+ * @param output
+ * @param rows
+ * @param cols
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public static void runJob(Path input, Path output, int rows, int cols)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ HadoopUtil.overwriteOutput(output);
+
+ Configuration conf = new Configuration();
+ conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rows);
+ Job job = new Job(conf, "AffinityMatrixInputJob: " + input + " -> M/R -> " + output);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(DistributedRowMatrix.MatrixEntryWritable.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(AffinityMatrixInputMapper.class);
+ job.setReducerClass(AffinityMatrixInputReducer.class);
+
+ FileInputFormat.addInputPath(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.waitForCompletion(true);
+ }
+
+ /**
+ * A transparent wrapper for the above method which handles the tedious tasks
+ * of setting and retrieving system Paths. Hands back a fully-populated
+ * and initialized DistributedRowMatrix.
+ * @param input
+ * @param output
+ * @param dimensions
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ public static DistributedRowMatrix runJob(Path input, Path output, int dimensions)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Path seqFiles = new Path(output, "seqfiles-" + (System.nanoTime() & 0xFF));
+ AffinityMatrixInputJob.runJob(input, seqFiles, dimensions, dimensions);
+ DistributedRowMatrix A = new DistributedRowMatrix(seqFiles,
+ new Path(seqFiles, "seqtmp-" + (System.nanoTime() & 0xFF)),
+ dimensions, dimensions);
+ A.configure(new JobConf());
+ return A;
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,60 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>Handles reading the files representing the affinity matrix. Since the affinity
+ * matrix is representative of a graph, each line in all the files should
+ * take the form:</p>
+ *
+ * <code>i,j,value</code>
+ *
+ * <p>where <code>i</code> and <code>j</code> are the <code>i</code>th and
+ * <code>j</code> data points in the entire set, and <code>value</code>
+ * represents some measurement of their relative absolute magnitudes. This
+ * is, simply, a method for representing a graph textually.
+ */
+public class AffinityMatrixInputMapper extends Mapper<LongWritable, Text, IntWritable, DistributedRowMatrix.MatrixEntryWritable> {
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ String [] elements = value.toString().split(",");
+ if (SpectralKMeansDriver.DEBUG) {
+ System.out.println("(DEBUG - MAP) Key[" + key.get() + "], " +
+ "Value[" + value.toString() + "]");
+ }
+
+ // enforce well-formed textual representation of the graph
+ if (elements.length != 3) {
+ throw new IOException("Expected input of length 3, received " +
+ elements.length + ". Please make sure you adhere to " +
+ "the structure of (i,j,value) for representing a graph " +
+ "in text.");
+ } else if (elements[0].length() == 0 || elements[1].length() == 0 ||
+ elements[2].length() == 0) {
+ throw new IOException("Found an element of 0 length. Please " +
+ "be sure you adhere to the structure of (i,j,value) for " +
+ "representing a graph in text.");
+ }
+
+ // parse the line of text into a DistributedRowMatrix entry,
+ // making the row (elements[0]) the key to the Reducer, and
+ // setting the column (elements[1]) in the entry itself
+ DistributedRowMatrix.MatrixEntryWritable toAdd =
+ new DistributedRowMatrix.MatrixEntryWritable();
+ IntWritable row = new IntWritable(Integer.valueOf(elements[0]));
+ toAdd.setRow(-1); // already set as the Reducer's key
+ toAdd.setCol(Integer.valueOf(elements[1]));
+ toAdd.setVal(Double.valueOf(elements[2]));
+ context.write(row, toAdd);
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,45 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * Tasked with taking each DistributedRowMatrix entry and collecting them
+ * into vectors corresponding to rows. The input and output keys are the same,
+ * corresponding to the row in the ensuing matrix. The matrix entries are
+ * entered into a vector according to the column to which they belong, and
+ * the vector is then given the key corresponding to its row.
+ */
+public class AffinityMatrixInputReducer extends
+ Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable,
+ IntWritable, VectorWritable> {
+
+ @Override
+ protected void reduce(IntWritable row,
+ Iterable<DistributedRowMatrix.MatrixEntryWritable> values,
+ Context context)
+ throws IOException, InterruptedException {
+ RandomAccessSparseVector out = new RandomAccessSparseVector(
+ context.getConfiguration()
+ .getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+
+ for (DistributedRowMatrix.MatrixEntryWritable element : values) {
+ out.setQuick(element.getCol(), element.getVal());
+ if (SpectralKMeansDriver.DEBUG) {
+ System.out.println("(DEBUG - REDUCE) Row[" + row.get() + "], " +
+ "Column[" + element.getCol() + "], Value[" +
+ element.getVal() + "]");
+ }
+ }
+ SequentialAccessSparseVector output = new SequentialAccessSparseVector(out);
+ context.write(row, new VectorWritable(output));
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class is a Writable implementation of the mahout.common.Pair
+ * generic class. Since the generic types would also themselves have to
+ * implement Writable, it made more sense to create a more specialized
+ * version of the class altogether.
+ *
+ * In essence, this can be treated as a single Vector Element.
+ */
+public class IntDoublePairWritable implements Writable {
+
+ private int key;
+ private double value;
+
+ public IntDoublePairWritable() {}
+
+ public IntDoublePairWritable(int k, double v) {
+ this.key = k;
+ this.value = v;
+ }
+
+ public void setKey(int k) {
+ this.key = k;
+ }
+
+ public void setValue(double v) {
+ this.value = v;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.key = in.readInt();
+ this.value = in.readDouble();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(key);
+ out.writeDouble(value);
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Given a matrix, this job returns a vector whose i_th element is the
+ * sum of all the elements in the i_th row of the original matrix.
+ */
+public final class MatrixDiagonalizeJob {
+
+ public static Vector runJob(Path affInput, int dimensions)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ // set up all the job tasks
+ Configuration conf = new Configuration();
+ Path diagOutput = new Path(affInput.getParent(), "diagonal");
+ HadoopUtil.overwriteOutput(diagOutput);
+ conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, dimensions);
+ Job job = new Job(conf, "MatrixDiagonalizeJob");
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(IntDoublePairWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(MatrixDiagonalizeMapper.class);
+ job.setReducerClass(MatrixDiagonalizeReducer.class);
+
+ FileInputFormat.addInputPath(job, affInput);
+ FileOutputFormat.setOutputPath(job, diagOutput);
+
+ job.waitForCompletion(true);
+
+ // read the results back from the path
+ return VectorCache.load(NullWritable.get(), conf,
+ new Path(diagOutput, "part-r-00000"));
+ }
+
+ public static class MatrixDiagonalizeMapper
+ extends Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable> {
+
+ @Override
+ protected void map(IntWritable key, VectorWritable row, Context context)
+ throws IOException, InterruptedException {
+ // store the sum
+ IntDoublePairWritable store = new IntDoublePairWritable(key.get(), row.get().zSum());
+ context.write(NullWritable.get(), store);
+ }
+ }
+
+ public static class MatrixDiagonalizeReducer
+ extends Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable> {
+
+ protected void reduce(NullWritable key, Iterable<IntDoublePairWritable> values,
+ Context context) throws IOException, InterruptedException {
+ // create the return vector
+ Vector retval = new DenseVector(context.getConfiguration().getInt(
+ EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE));
+ // put everything in its correct spot
+ for (IntDoublePairWritable e : values) {
+ retval.setQuick(e.getKey(), e.getValue());
+ }
+ // write it out
+ context.write(key, new VectorWritable(retval));
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * <p>Given a DistributedRowMatrix, this job normalizes each row to unit
+ * vector length. If the input is a matrix U, and the output is a matrix
+ * W, the job follows:</p>
+ *
+ * <p><code>v_ij = u_ij / sqrt(sum_j(u_ij * u_ij))</code></p>
+ */
+public final class UnitVectorizerJob {
+
+ public static void runJob(Path input, Path output)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "UnitVectorizerJob");
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(UnitVectorizerMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.waitForCompletion(true);
+ }
+
+ public static class UnitVectorizerMapper
+ extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ @Override
+ protected void map(IntWritable row, VectorWritable vector, Context context)
+ throws IOException, InterruptedException {
+
+ // set up the return value and perform the computations
+ double norm = vectorNorm(vector.get());
+ Vector w = vector.get().assign(Functions.div(norm));
+ RandomAccessSparseVector out = new RandomAccessSparseVector(w);
+
+ // finally write the output
+ context.write(row, new VectorWritable(out));
+ }
+
+ /**
+ * Sums the squares of all elements together, then takes the square root
+ * of that sum.
+ * @param u
+ * @return
+ */
+ private double vectorNorm(Vector u) {
+ double retval = 0.0;
+ for (Vector.Element e : u) {;
+ retval += Functions.POW.apply(e.get(), 2);
+ }
+ return Functions.SQRT.apply(retval);
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * This class handles reading and writing vectors to the Hadoop
+ * distributed cache. Created as a result of Eigencuts' liberal use
+ * of such functionality, but available to any algorithm requiring it.
+ */
+public final class VectorCache {
+
+ /**
+ *
+ * @param key SequenceFile key
+ * @param vector Vector to save, to be wrapped as VectorWritable
+ * @param output
+ * @param conf
+ * @param overwritePath
+ * @param deleteOnExit
+ */
+ public static void save(Writable key, Vector vector, Path output, Configuration
+ conf, boolean overwritePath, boolean deleteOnExit) throws IOException {
+
+ FileSystem fs = FileSystem.get(conf);
+ output = fs.makeQualified(output);
+ if (overwritePath) {
+ HadoopUtil.overwriteOutput(output);
+ }
+
+ // set the cache
+ DistributedCache.setCacheFiles(new URI[] {output.toUri()}, conf);
+
+ // set up the writer
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output,
+ IntWritable.class, VectorWritable.class);
+ writer.append(key, new VectorWritable(vector));
+ writer.close();
+
+ if (deleteOnExit) {
+ fs.deleteOnExit(output);
+ }
+ }
+
+ /**
+ * Calls the save() method, setting the cache to overwrite any previous
+ * Path and to delete the path after exiting
+ * @param key
+ * @param vector
+ * @param output
+ * @param conf
+ */
+ public static void save(Writable key, Vector vector, Path output, Configuration conf)
+ throws IOException {
+ VectorCache.save(key, vector, output, conf, true, true);
+ }
+
+ /**
+ * Loads the vector with the specified key from the cache. Returns null
+ * if nothing is found (up to the caller to handle this accordingly)
+ *
+ * @param key
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public static Vector load(Writable key, Configuration conf) throws IOException {
+ URI [] files = DistributedCache.getCacheFiles(conf);
+ if (files == null || files.length < 1) {
+ return null;
+ }
+ return VectorCache.load(key, conf, new Path(files[0].getPath()));
+ }
+
+ /**
+ * Loads a Vector from the specified path
+ *
+ * @param key
+ * @param conf
+ * @return
+ */
+ public static Vector load(Writable key, Configuration conf, Path input)
+ throws IOException {
+
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, input, conf);
+ VectorWritable retval = new VectorWritable();
+ reader.next(key, retval);
+ reader.close();
+ return retval.get();
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>This class handles the three-way multiplication of the digonal matrix
+ * and the Markov transition matrix inherent in the Eigencuts algorithm.
+ * The equation takes the form:</p>
+ *
+ * <code>W = D^(1/2) * M * D^(1/2)</code>
+ *
+ * <p>Since the diagonal matrix D has only n non-zero elements, it is represented
+ * as a dense vector in this job, rather than a full n-by-n matrix. This job
+ * performs the multiplications and returns the new DRM.
+ */
+public final class VectorMatrixMultiplicationJob {
+
+ /**
+ * Invokes the job.
+ * @param markovPath Path to the markov DRM's sequence files
+ * @param diag
+ * @param outputPath
+ * @return
+ */
+ public static DistributedRowMatrix runJob(Path markovPath, Vector diag,
+ Path outputPath) throws IOException, ClassNotFoundException, InterruptedException {
+
+ // set up the serialization of the diagonal vector
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ markovPath = fs.makeQualified(markovPath);
+ outputPath = fs.makeQualified(outputPath);
+ Path vectorOutputPath = new Path(outputPath.getParent(), "vector");
+ VectorCache.save(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), diag, vectorOutputPath, conf);
+
+ // set up the job itself
+ Job job = new Job(conf, "VectorMatrixMultiplication");
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(VectorMatrixMultiplicationMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, markovPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ job.waitForCompletion(true);
+
+ // build the resulting DRM from the results
+ return new DistributedRowMatrix(outputPath, new Path(outputPath, "tmp"),
+ diag.size(), diag.size());
+ }
+
+ public static class VectorMatrixMultiplicationMapper
+ extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private Vector diagonal;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ // read in the diagonal vector from the distributed cache
+ super.setup(context);
+ Configuration config = context.getConfiguration();
+ diagonal = VectorCache.load(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), config);
+ if (diagonal == null) {
+ throw new IOException("No vector loaded from cache!");
+ }
+ if (!(diagonal instanceof DenseVector)) {
+ diagonal = new DenseVector(diagonal);
+ }
+ }
+
+ @Override
+ protected void map(IntWritable key, VectorWritable row, Context ctx)
+ throws IOException, InterruptedException {
+
+ for (Vector.Element e : row.get()) {
+ double dii = Functions.SQRT.apply(diagonal.get(key.get()));
+ double djj = Functions.SQRT.apply(diagonal.get(e.index()));
+ double mij = e.get();
+ e.set(dii * mij * djj);
+ }
+ ctx.write(key, row);
+ }
+
+ /**
+ * Performs the setup of the Mapper. Used by unit tests.
+ * @param diag
+ */
+ void setup(Vector diag) {
+ this.diagonal = diag;
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Represents a vertex within the affinity graph for Eigencuts.
+ */
+public class VertexWritable implements Writable {
+
+ /** the row */
+ private int i;
+
+ /** the column */
+ private int j;
+
+ /** the value at this vertex */
+ private double value;
+
+ /** an extra type delimeter, can probably be null */
+ private String type;
+
+ public VertexWritable() {}
+
+ public VertexWritable(int i, int j, double v, String t) {
+ this.i = i;
+ this.j = j;
+ this.value = v;
+ this.type = t;
+ }
+
+ public int getRow() {
+ return i;
+ }
+
+ public void setRow(int i) {
+ this.i = i;
+ }
+
+ public int getCol() {
+ return j;
+ }
+
+ public void setCol(int j) {
+ this.j = j;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+ public void setValue(double v) {
+ this.value = v;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String t) {
+ this.type = t;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ this.i = arg0.readInt();
+ this.j = arg0.readInt();
+ this.value = arg0.readDouble();
+ this.type = arg0.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ arg0.writeInt(i);
+ arg0.writeInt(j);
+ arg0.writeDouble(value);
+ arg0.writeUTF(type);
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.common.VertexWritable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * TODO: Need a really good description of what this job does...
+ *
+ */
+public final class EigencutsAffinityCutsJob {
+
+ static enum CUTSCOUNTER {NUM_CUTS}
+
+ /**
+ * Runs a single iteration of defining cluster boundaries, based on
+ * previous calculations and the formation of the "cut matrix".
+ *
+ * @param currentAffinity Path to the current affinity matrix.
+ * @param cutMatrix Path to the sensitivity matrix.
+ * @param nextAffinity Output path for the new affinity matrix.
+ * @param conf
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public static long runjob(Path currentAffinity, Path cutMatrix, Path nextAffinity,
+ Configuration conf) throws IOException, ClassNotFoundException,
+ InterruptedException {
+
+ // these options allow us to differentiate between the two vectors
+ // in the mapper and reducer - we'll know from the working path
+ // which SequenceFile we're accessing
+ conf.set(EigencutsKeys.AFFINITY_PATH, currentAffinity.getName());
+ conf.set(EigencutsKeys.CUTMATRIX_PATH, cutMatrix.getName());
+
+ Job job = new Job(conf, "EigencutsAffinityCutsJob");
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(VertexWritable.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setMapperClass(EigencutsAffinityCutsMapper.class);
+ job.setCombinerClass(EigencutsAffinityCutsCombiner.class);
+ job.setReducerClass(EigencutsAffinityCutsReducer.class);
+
+ //FileInputFormat.addInputPath(job, currentAffinity);
+ FileInputFormat.addInputPath(job, cutMatrix);
+ FileOutputFormat.setOutputPath(job, nextAffinity);
+
+ job.waitForCompletion(true);
+
+ return job.getCounters().findCounter(CUTSCOUNTER.NUM_CUTS).getValue();
+ }
+
+ public static class EigencutsAffinityCutsMapper
+ extends Mapper<IntWritable, VectorWritable, Text, VertexWritable> {
+
+ @Override
+ protected void map(IntWritable key, VectorWritable row, Context context)
+ throws IOException, InterruptedException {
+
+ // all this method does is construct a bunch of vertices, mapping those
+ // together which have the same *combination* of indices; for example,
+ // (1, 3) will have the same key as (3, 1) but a different key from (1, 1)
+ // and (3, 3) (which, incidentally, will also not be grouped together)
+ String type = context.getWorkingDirectory().getName();
+ Vector vector = row.get();
+ for (Vector.Element e : vector) {
+ String newkey = Math.max(key.get(), e.index()) + "_" + Math.min(key.get(), e.index());
+ context.write(new Text(newkey), new VertexWritable(key.get(), e.index(), e.get(), type));
+ }
+ }
+ }
+
+ public static class EigencutsAffinityCutsCombiner
+ extends Reducer<Text, VertexWritable, Text, VertexWritable> {
+
+ @Override
+ protected void reduce(Text t, Iterable<VertexWritable> vertices,
+ Context context) throws IOException, InterruptedException {
+ // there should be exactly 4 items in the iterable; two from the
+ // first Path source, and two from the second with matching (i, j) indices
+
+ // the idea here is that we want the two vertices of the "cut" matrix,
+ // and if either of them has a non-zero value, we want to:
+ //
+ // 1) zero out the two affinity vertices, and
+ // 2) add their former values to the (i, i) and (j, j) coordinates
+ //
+ // though obviously we want to perform these steps in reverse order
+ boolean zero = false;
+ int i = -1, j = -1;
+ double k = 0;
+ Configuration conf = context.getConfiguration();
+ int count = 0;
+ System.out.println("DEBUG: " + t.toString());
+ for (VertexWritable v : vertices) {
+ count++;
+ if (v.getType().equals(conf.get(EigencutsKeys.AFFINITY_PATH))) {
+ i = v.getRow();
+ j = v.getCol();
+ k = v.getValue();
+ } else if (v.getValue() != 0.0) {
+ zero = true;
+ }
+ }
+ // if there are only two vertices, we have a diagonal
+ // we want to preserve whatever is currently in the diagonal,
+ // since this is acting as a running sum of all other values
+ // that have been "cut" so far - simply return this element as is
+ if (count == 2) {
+ VertexWritable vw = new VertexWritable(i, j, k, "unimportant");
+ context.write(new Text(String.valueOf(i)), vw);
+ return;
+ }
+
+ // do we zero out the values?
+ VertexWritable out_i = new VertexWritable();
+ VertexWritable out_j = new VertexWritable();
+ if (zero) {
+ // increment the cut counter
+ context.getCounter(CUTSCOUNTER.NUM_CUTS).increment(1);
+
+ // we want the values to exist on the diagonal
+ out_i.setCol(i);
+ out_j.setCol(j);
+
+ // also, set the old values to zero
+ VertexWritable zero_i = new VertexWritable();
+ VertexWritable zero_j = new VertexWritable();
+ zero_i.setCol(j);
+ zero_i.setValue(0);
+ zero_j.setCol(i);
+ zero_j.setValue(0);
+ zero_i.setType("unimportant");
+ zero_j.setType("unimportant");
+ context.write(new Text(String.valueOf(i)), zero_i);
+ context.write(new Text(String.valueOf(j)), zero_j);
+ } else {
+ out_i.setCol(j);
+ out_j.setCol(i);
+ }
+
+ // set the values and write them
+ out_i.setValue(k);
+ out_j.setValue(k);
+ out_i.setType("unimportant");
+ out_j.setType("unimportant");
+ context.write(new Text(String.valueOf(i)), out_i);
+ context.write(new Text(String.valueOf(j)), out_j);
+ }
+ }
+
+ public static class EigencutsAffinityCutsReducer
+ extends Reducer<Text, VertexWritable, IntWritable, VectorWritable> {
+
+ @Override
+ protected void reduce(Text row, Iterable<VertexWritable> entries,
+ Context context) throws IOException, InterruptedException {
+ // now to assemble the vectors
+ RandomAccessSparseVector output = new RandomAccessSparseVector(
+ context.getConfiguration().getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+ int rownum = Integer.parseInt(row.toString());
+ for (VertexWritable e : entries) {
+ // first, are we setting a diagonal?
+ if (e.getCol() == rownum) {
+ // add to what's already present
+ output.setQuick(e.getCol(), output.getQuick(e.getCol()) + e.getValue());
+ } else {
+ // simply set the value
+ output.setQuick(e.getCol(), e.getValue());
+ }
+ }
+ context.write(new IntWritable(rownum), new VectorWritable(output));
+ }
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.common.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
+import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.stats.OnlineSummarizer;
+
+public class EigencutsDriver extends AbstractJob {
+
+ public static final double EPSILON_DEFAULT = 0.25;
+ public static final double TAU_DEFAULT = -0.1;
+ public static final double OVERSHOOT_MULTIPLIER = 1.5;
+
+ // parameters of interest
+ /** number of dimensions in the square affinity matrix */
+ private int dimensions;
+ /** user-supplied minimum half-life threshold */
+ private double halflife;
+ /** user-supplied coefficient for setting minimum half-life threshold */
+ private double epsilon;
+ /** user-supplied threshold for cutting links in the affinity graph */
+ private double tau;
+
+ public static void main(String args[]) throws Exception {
+ ToolRunner.run(new EigencutsDriver(), args);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+
+ // set up command line arguments
+ Configuration conf = new Configuration();
+ addOption("input", "i", "Path to input affinity matrix data", true);
+ addOption("output", "o", "Output of clusterings", true);
+ addOption("half-life", "b", "Minimal half-life threshold", true);
+ addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+ addOption("epsilon", "e", "Half-life threshold coefficient", Double.toString(EigencutsDriver.EPSILON_DEFAULT));
+ addOption("tau", "t", "Threshold for cutting affinities", Double.toString(EigencutsDriver.TAU_DEFAULT));
+ Map<String, String> parsedArgs = parseArguments(arg0);
+ if (parsedArgs == null) {
+ return 0;
+ }
+
+ // read in the command line values
+ Path input = new Path(parsedArgs.get("--input"));
+ Path output = new Path(parsedArgs.get("--output"));
+ dimensions = Integer.parseInt(parsedArgs.get("--dimensions"));
+ halflife = Integer.parseInt(parsedArgs.get("--half-life"));
+ epsilon = Double.parseDouble(parsedArgs.get("--epsilon"));
+ tau = Double.parseDouble(parsedArgs.get("--tau"));
+
+ // create a few new Paths for temp files and transformations
+ Path outputCalc = new Path(output, "calculations");
+ Path outputTmp = new Path(output, "temporary");
+
+ DistributedRowMatrix A = AffinityMatrixInputJob.runJob(input, outputCalc, dimensions);
+ Vector D = MatrixDiagonalizeJob.runJob(A.getRowPath(), dimensions);
+
+ long numCuts = 0;
+ do {
+ // first three steps are the same as spectral k-means:
+ // 1) calculate D from A
+ // 2) calculate L = D^-0.5 * A * D^-0.5
+ // 3) calculate eigenvectors of L
+
+ DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(A.getRowPath(),
+ D, new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)));
+ L.configure(new JobConf(conf));
+
+ // eigendecomposition (step 3)
+ int overshoot = (int)((double)dimensions *
+ EigencutsDriver.OVERSHOOT_MULTIPLIER);
+ List<Double> eigenValues = new ArrayList<Double>(overshoot);
+ Matrix eigenVectors = new DenseMatrix(overshoot, dimensions);
+ DistributedRowMatrix U = performEigenDecomposition(L,
+ dimensions, overshoot, eigenValues, eigenVectors,
+ outputCalc);
+ U.configure(new JobConf(conf));
+ eigenValues = eigenValues.subList(0, dimensions);
+
+ // here's where things get interesting: steps 4, 5, and 6 are unique
+ // to this algorithm, and depending on the final output, steps 1-3
+ // may be repeated as well
+
+ // helper method, since apparently List and Vector objects don't play nicely
+ Vector evs = listToVector(eigenValues);
+
+ // calculate sensitivities (step 4 and step 5)
+ Path sensitivities = new Path(outputCalc, "sensitivities-" + (System.nanoTime() & 0xFF));
+ EigencutsSensitivityJob.runJob(evs, D, U.getRowPath(), halflife,
+ tau, median(D), epsilon, sensitivities);
+
+ // perform the cuts (step 6)
+ input = new Path(outputTmp, "nextAff-" + (System.nanoTime() & 0xFF));
+ numCuts = EigencutsAffinityCutsJob.runjob(A.getRowPath(), sensitivities, input, conf);
+
+ // how many cuts were made?
+ if (numCuts > 0) {
+ // recalculate A
+ A = new DistributedRowMatrix(input, new Path(outputTmp, Long.toString(System.nanoTime())),
+ dimensions, dimensions);
+ A.configure(new JobConf());
+ }
+ } while (numCuts > 0);
+
+ // TODO: output format???
+
+ return 0;
+ }
+
+ /**
+ * Does most of the heavy lifting in setting up Paths, configuring return
+ * values, and generally performing the tedious administrative tasks involved
+ * in an eigen-decomposition and running the verifier
+ * @param input
+ * @param numEigenVectors
+ * @param overshoot
+ * @param eigenValues
+ * @param eigenVectors
+ * @param tmp
+ * @return
+ */
+ public static DistributedRowMatrix performEigenDecomposition(
+ DistributedRowMatrix input, int numEigenVectors, int overshoot,
+ List<Double> eigenValues, Matrix eigenVectors, Path tmp)
+ throws IOException {
+ DistributedLanczosSolver solver = new DistributedLanczosSolver();
+ Path seqFiles = new Path(tmp, "eigendecomp-" + (System.nanoTime() & 0xFF));
+ solver.runJob(new Configuration(), input.getRowPath(), new Path(tmp,
+ "lanczos-" + (System.nanoTime() & 0xFF)), input.numRows(),
+ input.numCols(), true, overshoot, eigenVectors, eigenValues,
+ seqFiles.toString());
+
+ // now run the verifier to trim down the number of eigenvectors
+ EigenVerificationJob verifier = new EigenVerificationJob();
+ Path verifiedEigens = new Path(tmp, "verifiedeigens");
+ verifier.runJob(seqFiles, input.getRowPath(), verifiedEigens,
+ false, 1.0, 0.0, numEigenVectors);
+ Path cleanedEigens = verifier.getCleanedEigensPath();
+ return new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens,
+ "tmp"), numEigenVectors, input.numRows());
+ }
+
+ /**
+ * A quick and dirty hack to compute the median of a vector...
+ * @param v
+ * @return
+ */
+ private double median(Vector v) {
+ OnlineSummarizer med = new OnlineSummarizer();
+ if (v.size() < 100) {
+ return v.zSum() / v.size();
+ }
+ for (Vector.Element e : v) {
+ med.add(e.get());
+ }
+ return med.getMedian();
+ }
+
+ /**
+ * Iteratively loops through the list, converting it to a Vector of double
+ * primitives worthy of other Mahout operations
+ * @param list
+ * @return
+ */
+ private Vector listToVector(List<Double> list) {
+ Vector retval = new DenseVector(list.size());
+ int index = 0;
+ for (Double d : list) {
+ retval.setQuick(index++, d.doubleValue());
+ }
+ return retval;
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+/**
+ * Configuration keys for the Eigencuts algorithm (analogous to KMeansConfigKeys)
+ */
+public interface EigencutsKeys {
+
+ /**
+ * B_0, or the user-specified minimum eigenflow half-life threshold
+ * for an eigenvector/eigenvalue pair to be considered. Increasing
+ * B_0 equates to fewer clusters
+ */
+ String BETA = "org.apache.mahout.clustering.spectral.beta";
+
+ /**
+ * Tau, or the user-specified threshold for making cuts (setting edge
+ * affinities to 0) after performing non-maximal suppression on edge weight
+ * sensitivies. Increasing tau equates to more edge cuts
+ */
+ String TAU = "org.apache.mahout.clustering.spectral.tau";
+
+ /**
+ * The normalization factor for computing the cut threshold
+ */
+ String DELTA = "org.apache.mahout.clustering.spectral.delta";
+
+ /**
+ * Epsilon, or the user-specified coefficient that works in tandem with
+ * MINIMUM_HALF_LIFE to determine which eigenvector/eigenvalue pairs to use.
+ * Increasing epsilon equates to fewer eigenvector/eigenvalue pairs
+ */
+ String EPSILON = "org.apache.mahout.clustering.spectral.epsilon";
+
+ /**
+ * Base path to the location on HDFS where the diagonal matrix (a vector)
+ * and the list of eigenvalues will be stored for one of the map/reduce
+ * jobs in Eigencuts.
+ */
+ String VECTOR_CACHE_BASE = "org.apache.mahout.clustering.spectral.eigencuts.vectorcache";
+
+ /**
+ * Refers to the dimensions of the raw affinity matrix input. Since this
+ * matrix is symmetrical, it is a square matrix, hence all its dimensions
+ * are equal.
+ */
+ String AFFINITY_DIMENSIONS = "org.apache.mahout.clustering.spectral.eigencuts.affinitydimensions";
+
+ /**
+ * Refers to the Path to the SequenceFile representing the affinity matrix
+ */
+ String AFFINITY_PATH = "org.apache.mahout.clustering.spectral.eigencuts.affinitypath";
+
+ /**
+ * Refers to the Path to the SequenceFile representing the cut matrix
+ */
+ String CUTMATRIX_PATH = "org.apache.mahout.clustering.spectral.eigencuts.cutmatrixpath";
+
+ /**
+ * Sets the SequenceFile index for the list of eigenvalues.
+ */
+ final int EIGENVALUES_CACHE_INDEX = 0;
+
+ /**
+ * Sets the SequenceFile index for the diagonal matrix.
+ */
+ final int DIAGONAL_CACHE_INDEX = 1;
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.common.VectorCache;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <p>There are a quite a few operations bundled within this mapper. Gather 'round
+ * and listen, all of ye.</p>
+ *
+ * <p>The input to this job is eight items:</p>
+ * <ol><li>B<sub>0</sub>, which is a command-line parameter fed through the Configuration object</li>
+ * <li>diagonal matrix, a constant vector fed through the Hadoop cache</li>
+ * <li>list of eigenvalues, a constant vector fed through the Hadoop cache</li>
+ * <li>eigenvector, the input value to the mapper</li>
+ * <li>epsilon</li>
+ * <li>delta</li>
+ * <li>tau</li>
+ * <li>output, the Path to the output matrix of sensitivities</li></ol>
+ *
+ * <p>The first three items are constant and are used in all of the map
+ * tasks. The row index indicates which eigenvalue from the list to use, and
+ * also serves as the output identifier. The diagonal matrix and the
+ * eigenvector are both of equal length and are iterated through twice
+ * within each map task, unfortunately lending each task to a runtime of
+ * n<sup>2</sup>. This is unavoidable.</p>
+ *
+ * <p>For each (i, j) combination of elements within the eigenvector, a complex
+ * equation is run that explicitly computes the sensitivity to perturbation of
+ * the flow of probability within the specific edge of the graph. Each
+ * sensitivity, as it is computed, is simultaneously applied to a non-maximal
+ * suppression step: for a given sensitivity S_ij, it must be suppressed if
+ * any other S_in or S_mj has a more negative value. Thus, only the most
+ * negative S_ij within its row i or its column j is stored in the return
+ * array, leading to an output (per eigenvector!) with maximum length n,
+ * minimum length 1.</p>
+ *
+ * <p>Overall, this creates an n-by-n (possibly sparse) matrix with a maximum
+ * of n^2 non-zero elements, minimum of n non-zero elements.</p>
+ */
+public final class EigencutsSensitivityJob {
+
+ /**
+ * Initializes the configuration tasks, loads the needed data into
+ * the HDFS cache, and executes the job.
+ *
+ * @param eigenvalues Vector of eigenvalues
+ * @param diagonal Vector representing the diagonal matrix
+ * @param eigenvectors Path to the DRM of eigenvectors
+ * @param beta
+ * @param tau
+ * @param delta
+ * @param epsilon
+ * @param output Path to the output matrix (will have between n and full-rank
+ * non-zero elements)
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public static void runJob(Vector eigenvalues, Vector diagonal, Path
+ eigenvectors, double beta, double tau, double delta, double epsilon, Path output)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ // save the two vectors to the distributed cache
+ Configuration jobConfig = new Configuration();
+ Path eigenOutputPath = new Path(output.getParent(), "eigenvalues");
+ Path diagOutputPath = new Path(output.getParent(), "diagonal");
+ jobConfig.set(EigencutsKeys.VECTOR_CACHE_BASE, output.getParent().getName());
+ VectorCache.save(new IntWritable(EigencutsKeys.EIGENVALUES_CACHE_INDEX),
+ eigenvalues, eigenOutputPath, jobConfig);
+ VectorCache.save(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX),
+ diagonal, diagOutputPath, jobConfig);
+
+ // set up the rest of the job
+ jobConfig.set(EigencutsKeys.BETA, Double.toString(beta));
+ jobConfig.set(EigencutsKeys.EPSILON, Double.toString(epsilon));
+ jobConfig.set(EigencutsKeys.DELTA, Double.toString(delta));
+ jobConfig.set(EigencutsKeys.TAU, Double.toString(tau));
+
+ Job job = new Job(jobConfig, "EigencutsSensitivityJob");
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(EigencutsSensitivityNode.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(EigencutsSensitivityMapper.class);
+ job.setReducerClass(EigencutsSensitivityReducer.class);
+
+ FileInputFormat.addInputPath(job, eigenvectors);
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.waitForCompletion(true);
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.common.VectorCache;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+public class EigencutsSensitivityMapper extends
+ Mapper<IntWritable, VectorWritable, IntWritable, EigencutsSensitivityNode> {
+
+ private Vector eigenvalues;
+ private Vector diagonal;
+ private double beta0;
+ private double epsilon;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration config = context.getConfiguration();
+ beta0 = Double.parseDouble(config.get(EigencutsKeys.BETA));
+ epsilon = Double.parseDouble(config.get(EigencutsKeys.EPSILON));
+
+ // read in the two vectors from the cache
+ eigenvalues = VectorCache.load(
+ new IntWritable(EigencutsKeys.EIGENVALUES_CACHE_INDEX), config);
+ diagonal = VectorCache.load(
+ new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), config);
+ if (!(eigenvalues instanceof SequentialAccessSparseVector || eigenvalues instanceof DenseVector)) {
+ eigenvalues = new SequentialAccessSparseVector(eigenvalues);
+ }
+ if (!(diagonal instanceof SequentialAccessSparseVector || diagonal instanceof DenseVector)) {
+ diagonal = new SequentialAccessSparseVector(diagonal);
+ }
+ }
+
+ @Override
+ protected void map(IntWritable row, VectorWritable vw, Context context)
+ throws IOException, InterruptedException {
+
+ // first, does this particular eigenvector even pass the required threshold?
+ double eigenvalue = Math.abs(eigenvalues.get(row.get()));
+ double betak = -Functions.LOGARITHM.apply(2) / Functions.LOGARITHM.apply(eigenvalue);
+ if (eigenvalue >= 1.0 || betak <= (epsilon * beta0)) {
+ // doesn't pass the threshold! quit
+ return;
+ }
+
+ // go through the vector, performing the calculations
+ // sadly, no way to get around n^2 computations
+ Map<Integer, EigencutsSensitivityNode> columns = new HashMap<Integer, EigencutsSensitivityNode>();
+ Vector ev = vw.get();
+ for (int i = 0; i < ev.size(); i++) {
+ double minS_ij = Double.MAX_VALUE;
+ int minInd = -1;
+ for (int j = 0; j < ev.size(); j++) {
+ double S_ij = performSensitivityCalculation(eigenvalue, ev.get(i),
+ ev.get(j), diagonal.get(i), diagonal.get(j));
+
+ // perform non-maximal suppression
+ // is this the smallest value in the row?
+ if (S_ij < minS_ij) {
+ minS_ij = S_ij;
+ minInd = j;
+ }
+ }
+
+ // is this the smallest value in the column?
+ Integer column = new Integer(minInd);
+ EigencutsSensitivityNode value = new EigencutsSensitivityNode(i, minInd, minS_ij);
+ if (!columns.containsKey(column)) {
+ columns.put(column, value);
+ } else if (columns.get(column).getSensitivity() > minS_ij) {
+ columns.remove(column);
+ columns.put(column, value);
+ }
+ }
+
+ // write whatever values made it through
+
+ for (EigencutsSensitivityNode e : columns.values().toArray(new EigencutsSensitivityNode[0])) {
+ context.write(new IntWritable(e.getRow()), e);
+ }
+ }
+
+ /**
+ * Helper method, performs the actual calculation. Looks something like this:
+ *
+ * (log(2) / lambda_k * log(lambda_k) * log(lambda_k^beta0 / 2)) * [
+ * - (((u_i / sqrt(d_i)) - (u_j / sqrt(d_j)))^2 + (1 - lambda) *
+ * ((u_i^2 / d_i) + (u_j^2 / d_j))) ]
+ *
+ * @param eigenvalue
+ * @param ev_i
+ * @param ev_j
+ * @param diag_i
+ * @param diag_j
+ * @return
+ */
+ private double performSensitivityCalculation(double eigenvalue, double
+ ev_i, double ev_j, double diag_i, double diag_j) {
+
+ double firsthalf = Functions.LOGARITHM.apply(2) / (
+ eigenvalue * Functions.LOGARITHM.apply(eigenvalue) *
+ Functions.LOGARITHM.apply(Functions.POW.apply(eigenvalue, beta0) / 2));
+
+ double secondhalf = -Functions.POW.apply(((ev_i /
+ Functions.SQRT.apply(diag_i)) - (ev_j /
+ Functions.SQRT.apply(diag_j))), 2) + (1 - eigenvalue) *
+ ((Functions.POW.apply(ev_i, 2) / diag_i) +
+ (Functions.POW.apply(ev_j, 2) / diag_j));
+
+ return firsthalf * secondhalf;
+ }
+
+ /**
+ * Utility helper method, used for unit testing.
+ * @param beta0
+ * @param epsilon
+ * @param eigenvalues
+ * @param diagonal
+ */
+ void setup(double beta0, double epsilon, Vector eigenvalues, Vector diagonal) {
+ this.beta0 = beta0;
+ this.epsilon = epsilon;
+ this.eigenvalues = eigenvalues;
+ this.diagonal = diagonal;
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class allows the storage of computed sensitivities in an
+ * unordered fashion, instead having each sensitivity track its
+ * own (i, j) coordinate. Thus these objects can be stored as elements
+ * in any list or, in particular, Writable array.
+ */
+public class EigencutsSensitivityNode implements Writable {
+
+ private int row;
+ private int column;
+ private double sensitivity;
+
+ public EigencutsSensitivityNode(int i, int j, double s) {
+ row = i;
+ column = j;
+ sensitivity = s;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.row = in.readInt();
+ this.column = in.readInt();
+ this.sensitivity = in.readDouble();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(row);
+ out.writeInt(column);
+ out.writeDouble(sensitivity);
+ }
+
+ public int getRow() {
+ return row;
+ }
+
+ public int getColumn() {
+ return column;
+ }
+
+ public double getSensitivity() {
+ return sensitivity;
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <p>The point of this class is to take all the arrays of sensitivities
+ * and convert them to a single matrix. Since there may be many values
+ * that, according to their (i, j) coordinates, overlap in the matrix,
+ * the "winner" will be determined by whichever value is smaller.</p>
+ */
+public class EigencutsSensitivityReducer extends
+ Reducer<IntWritable, EigencutsSensitivityNode, IntWritable, VectorWritable> {
+
+ @Override
+ protected void reduce(IntWritable key, Iterable<EigencutsSensitivityNode> arr,
+ Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Vector v = new RandomAccessSparseVector(conf
+ .getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+ double threshold = Double.parseDouble(conf.get(EigencutsKeys.TAU)) /
+ Double.parseDouble(conf.get(EigencutsKeys.DELTA));
+
+ for (EigencutsSensitivityNode n : arr) {
+ if (n.getSensitivity() < threshold && n.getSensitivity() < v.getQuick(n.getColumn())) {
+ v.setQuick(n.getColumn(), n.getSensitivity());
+ }
+ }
+ context.write(key, new VectorWritable(v));
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.common.UnitVectorizerJob;
+import org.apache.mahout.clustering.spectral.common.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
+import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+
+/**
+ * Implementation of the EigenCuts spectral clustering algorithm.
+ */
+public class SpectralKMeansDriver extends AbstractJob {
+
+ public static final boolean DEBUG = false;
+
+ public static final double OVERSHOOT_MULTIPLIER = 2.0;
+
+ public static void main(String args[]) throws Exception {
+ ToolRunner.run(new SpectralKMeansDriver(), args);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ // set up command line options
+ Configuration conf = new Configuration();
+ addOption("input", "i", "Path to input affinity matrix data", true);
+ addOption("output", "o", "Output of clusterings", true);
+ addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+ addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
+ Map<String, String> parsedArgs = parseArguments(arg0);
+ if (parsedArgs == null) {
+ return 0;
+ }
+
+ // TODO: Need to be able to read all k-means parameters, though
+ // they will be optional parameters to the algorithm
+ // read the values of the command line
+ Path input = new Path(parsedArgs.get("--input"));
+ Path output = new Path(parsedArgs.get("--output"));
+ int numDims = Integer.parseInt(parsedArgs.get("--dimensions"));
+ int clusters = Integer.parseInt(parsedArgs.get("--clusters"));
+
+ // create a few new Paths for temp files and transformations
+ Path outputCalc = new Path(output, "calculations");
+ Path outputTmp = new Path(output, "temporary");
+
+ // Take in the raw CSV text file and split it ourselves,
+ // creating our own SequenceFiles for the matrices to read later
+ // (similar to the style of syntheticcontrol.canopy.InputMapper)
+ Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() & 0xFF));
+ AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+
+ // Next step: construct the affinity matrix using the newly-created
+ // sequence files
+ DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
+ new Path(outputTmp, "afftmp-" + (System.nanoTime() & 0xFF)),
+ numDims,
+ numDims);
+ JobConf depConf = new JobConf(conf);
+ A.configure(depConf);
+
+ // Next step: construct the diagonal matrix D (represented as a vector)
+ // and calculate the normalized Laplacian of the form:
+ // L = D^(-0.5)AD^(-0.5)
+ Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+ DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian-"
+ + (System.nanoTime() & 0xFF)));
+ L.configure(new JobConf(conf));
+
+ // Next step: perform eigen-decomposition using LanczosSolver
+ // since some of the eigen-output is spurious and will be eliminated
+ // upon verification, we have to aim to overshoot and then discard
+ // unnecessary vectors later
+ int overshoot = (int) ((double) clusters * SpectralKMeansDriver.OVERSHOOT_MULTIPLIER);
+ List<Double> eigenValues = new ArrayList<Double>(overshoot);
+ Matrix eigenVectors = new DenseMatrix(overshoot, numDims);
+ DistributedLanczosSolver solver = new DistributedLanczosSolver();
+ Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" + (System.nanoTime() & 0xFF));
+ solver.runJob(conf,
+ L.getRowPath(),
+ new Path(outputTmp, "lanczos-" + (System.nanoTime() & 0xFF)),
+ L.numRows(),
+ L.numCols(),
+ true,
+ overshoot,
+ eigenVectors,
+ eigenValues,
+ lanczosSeqFiles.toString());
+
+ // perform a verification
+ EigenVerificationJob verifier = new EigenVerificationJob();
+ Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
+ verifier.runJob(lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, 0.0, clusters);
+ Path cleanedEigens = verifier.getCleanedEigensPath();
+ DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+ W.configure(new JobConf());
+ DistributedRowMatrix Wtrans = W.transpose();
+ // DistributedRowMatrix Wt = W.transpose();
+
+ // next step: normalize the rows of Wt to unit length
+ Path unitVectors = new Path(outputCalc, "unitvectors-" + (System.nanoTime() & 0xFF));
+ UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
+ DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+ Wt.configure(new JobConf());
+
+ // Iterator<MatrixSlice> i = W.iterator();
+ // int x = 0;
+ // while (i.hasNext()) {
+ // Vector v = i.next().vector();
+ // System.out.println("EIGENVECTOR " + (++x));
+ // for (int c = 0; c < v.size(); c++) {
+ // System.out.print(v.get(c) + " ");
+ // }
+ // System.out.println();
+ // }
+ // System.exit(0);
+
+ // Finally, perform k-means clustering on the rows of L (or W)
+ // generate random initial clusters
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ Path initialclusters = RandomSeedGenerator.buildRandom(Wt.getRowPath(),
+ new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
+ clusters,
+ measure);
+ KMeansDriver.run(new Configuration(), Wt.getRowPath(), initialclusters, output, measure, 0.001, 10, true, false);
+
+ // Read through the cluster assignments
+ Path clusteredPointsPath = new Path(output, "clusteredPoints");
+ FileSystem fs = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(clusteredPointsPath, "part-m-00000"), conf);
+ // The key is the clusterId
+ IntWritable clusterId = new IntWritable(0);
+ // The value is the weighted vector
+ WeightedVectorWritable value = new WeightedVectorWritable();
+
+ // Map<Integer, Integer> map = new HashMap<Integer, Integer>();
+ int id = 0;
+ while (reader.next(clusterId, value)) {
+ // Integer key = new Integer(clusterId.get());
+ // if (map.containsKey(key)) {
+ // Integer count = map.remove(key);
+ // map.put(key, new Integer(count.intValue() + 1));
+ // } else {
+ // map.put(key, new Integer(1));
+ // }
+ System.out.println((id++) + ": " + clusterId.get());
+ clusterId = new IntWritable(0);
+ value = new WeightedVectorWritable();
+ }
+ reader.close();
+
+ // TODO: output format???
+
+ return 0;
+ }
+}
Re: svn commit: r1003188 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/clustering/spectral/
main/java/org/apache/mahout/clustering/spectral/common/ main/java/org/apache/mahout/clustering/spectral/eigencuts/
main/java/org/apache/mahout/clus...
Posted by Jeff Eastman <jd...@windwardsolutions.com>.
Woops! I saw the second email and not the first and jumped the gun.
Personally, I think there is value in getting more eyeballs on this code
to help with the output, find bugs, etc. I'm certainly willing to help
out. It has a lot of functionality in place and is much easier to handle
with small patches going forward than one whopper. We have had
work-in-progress commits before and some things in 0.4 will still be
experimental or unpolished.
Jeff