You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by je...@apache.org on 2010/09/30 20:00:24 UTC

svn commit: r1003188 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/clustering/spectral/ main/java/org/apache/mahout/clustering/spectral/common/ main/java/org/apache/mahout/clustering/spectral/eigencuts/ main/java/org/apache/mahout/clus...

Author: jeastman
Date: Thu Sep 30 18:00:23 2010
New Revision: 1003188

URL: http://svn.apache.org/viewvc?rev=1003188&view=rev
Log:
MAHOUT-363: committing latest patch from the JIRA

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,76 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+public final class AffinityMatrixInputJob {
+
+  /**
+   * Initializes and executes the job of reading the documents containing
+   * the data of the affinity matrix in (x_i, x_j, value) format.
+   * 
+   * @param input
+   * @param output
+   * @param rows
+   * @param cols
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static void runJob(Path input, Path output, int rows, int cols)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    HadoopUtil.overwriteOutput(output);
+
+    Configuration conf = new Configuration();
+    conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rows);
+    Job job = new Job(conf, "AffinityMatrixInputJob: " + input + " -> M/R -> " + output);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(DistributedRowMatrix.MatrixEntryWritable.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(AffinityMatrixInputMapper.class);   
+    job.setReducerClass(AffinityMatrixInputReducer.class);
+
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.waitForCompletion(true);
+  }
+
+  /**
+   * A transparent wrapper for the above method which handles the tedious tasks
+   * of setting and retrieving system Paths. Hands back a fully-populated
+   * and initialized DistributedRowMatrix.
+   * @param input
+   * @param output
+   * @param dimensions
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static DistributedRowMatrix runJob(Path input, Path output, int dimensions)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Path seqFiles = new Path(output, "seqfiles-" + (System.nanoTime() & 0xFF));
+    AffinityMatrixInputJob.runJob(input, seqFiles, dimensions, dimensions);
+    DistributedRowMatrix A = new DistributedRowMatrix(seqFiles, 
+        new Path(seqFiles, "seqtmp-" + (System.nanoTime() & 0xFF)), 
+        dimensions, dimensions);
+    A.configure(new JobConf());
+    return A;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputMapper.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,60 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>Handles reading the files representing the affinity matrix. Since the affinity
+ * matrix is representative of a graph, each line in all the files should
+ * take the form:</p>
+ * 
+ * <code>i,j,value</code>
+ * 
+ * <p>where <code>i</code> and <code>j</code> are the <code>i</code>th and
+ * <code>j</code> data points in the entire set, and <code>value</code> 
+ * represents some measurement of their relative absolute magnitudes. This
+ * is, simply, a method for representing a graph textually.
+ */
+public class AffinityMatrixInputMapper extends Mapper<LongWritable, Text, IntWritable, DistributedRowMatrix.MatrixEntryWritable> {
+
+	@Override
+	protected void map(LongWritable key, Text value, Context context) 
+						throws IOException, InterruptedException {
+		
+		String [] elements = value.toString().split(",");
+		if (SpectralKMeansDriver.DEBUG) {
+			System.out.println("(DEBUG - MAP) Key[" + key.get() + "], " + 
+						"Value[" + value.toString() + "]");
+		}
+		
+		// enforce well-formed textual representation of the graph
+		if (elements.length != 3) {
+			throw new IOException("Expected input of length 3, received " + 
+					elements.length + ". Please make sure you adhere to " + 
+					"the structure of (i,j,value) for representing a graph " +
+					"in text.");
+		} else if (elements[0].length() == 0 || elements[1].length() == 0 || 
+				elements[2].length() == 0) {
+			throw new IOException("Found an element of 0 length. Please " +
+					"be sure you adhere to the structure of (i,j,value) for " +
+					"representing a graph in text.");
+		}
+			
+		// parse the line of text into a DistributedRowMatrix entry,
+		// making the row (elements[0]) the key to the Reducer, and
+		// setting the column (elements[1]) in the entry itself
+		DistributedRowMatrix.MatrixEntryWritable toAdd = 
+			new DistributedRowMatrix.MatrixEntryWritable();
+		IntWritable row = new IntWritable(Integer.valueOf(elements[0]));
+		toAdd.setRow(-1); // already set as the Reducer's key 
+		toAdd.setCol(Integer.valueOf(elements[1]));
+		toAdd.setVal(Double.valueOf(elements[2]));
+		context.write(row, toAdd);
+	}
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/AffinityMatrixInputReducer.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,45 @@
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * Tasked with taking each DistributedRowMatrix entry and collecting them
+ * into vectors corresponding to rows. The input and output keys are the same,
+ * corresponding to the row in the ensuing matrix. The matrix entries are
+ * entered into a vector according to the column to which they belong, and
+ * the vector is then given the key corresponding to its row.
+ */
+public class AffinityMatrixInputReducer extends
+		Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable, 
+				IntWritable, VectorWritable> {
+	
+	@Override
+	protected void reduce(IntWritable row, 
+							Iterable<DistributedRowMatrix.MatrixEntryWritable> values,
+							Context context) 
+							throws IOException, InterruptedException {
+		RandomAccessSparseVector out = new RandomAccessSparseVector(
+							context.getConfiguration()
+							.getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+
+		for (DistributedRowMatrix.MatrixEntryWritable element : values) {
+			out.setQuick(element.getCol(), element.getVal());
+			if (SpectralKMeansDriver.DEBUG) {
+				System.out.println("(DEBUG - REDUCE) Row[" + row.get() + "], " + 
+						"Column[" + element.getCol() + "], Value[" + 
+						element.getVal() + "]");
+			}
+		}
+		SequentialAccessSparseVector output = new SequentialAccessSparseVector(out);
+		context.write(row, new VectorWritable(output));
+	}
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/IntDoublePairWritable.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class is a Writable implementation of the mahout.common.Pair
+ * generic class. Since the generic types would also themselves have to
+ * implement Writable, it made more sense to create a more specialized
+ * version of the class altogether.
+ * 
+ * In essence, this can be treated as a single Vector Element.
+ */
+public class IntDoublePairWritable implements Writable {
+  
+  private int key;
+  private double value;
+  
+  public IntDoublePairWritable() {}
+  
+  public IntDoublePairWritable(int k, double v) {
+    this.key = k;
+    this.value = v;
+  }
+  
+  public void setKey(int k) {
+    this.key = k;
+  }
+  
+  public void setValue(double v) {
+    this.value = v;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.key = in.readInt();
+    this.value = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(key);
+    out.writeDouble(value);
+  }
+
+  public int getKey() {
+    return key;
+  }
+
+  public double getValue() {
+    return value;
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/MatrixDiagonalizeJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Given a matrix, this job returns a vector whose i_th element is the 
+ * sum of all the elements in the i_th row of the original matrix.
+ */
+public final class MatrixDiagonalizeJob {
+
+  public static Vector runJob(Path affInput, int dimensions) 
+    throws IOException, ClassNotFoundException, InterruptedException {
+    
+    // set up all the job tasks
+    Configuration conf = new Configuration();
+    Path diagOutput = new Path(affInput.getParent(), "diagonal");
+    HadoopUtil.overwriteOutput(diagOutput);
+    conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, dimensions);
+    Job job = new Job(conf, "MatrixDiagonalizeJob");
+    
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(IntDoublePairWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(MatrixDiagonalizeMapper.class);
+    job.setReducerClass(MatrixDiagonalizeReducer.class);
+    
+    FileInputFormat.addInputPath(job, affInput);
+    FileOutputFormat.setOutputPath(job, diagOutput);
+    
+    job.waitForCompletion(true);
+    
+    // read the results back from the path
+    return VectorCache.load(NullWritable.get(), conf, 
+        new Path(diagOutput, "part-r-00000"));
+  }
+  
+  public static class MatrixDiagonalizeMapper
+    extends Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable> {
+    
+    @Override
+    protected void map(IntWritable key, VectorWritable row, Context context) 
+      throws IOException, InterruptedException {
+      // store the sum
+      IntDoublePairWritable store = new IntDoublePairWritable(key.get(), row.get().zSum());
+      context.write(NullWritable.get(), store);
+    }
+  }
+  
+  public static class MatrixDiagonalizeReducer
+    extends Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable> {
+    
+    protected void reduce(NullWritable key, Iterable<IntDoublePairWritable> values, 
+      Context context) throws IOException, InterruptedException {
+      // create the return vector
+      Vector retval = new DenseVector(context.getConfiguration().getInt(
+          EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE));
+      // put everything in its correct spot
+      for (IntDoublePairWritable e : values) {
+        retval.setQuick(e.getKey(), e.getValue());
+      }
+      // write it out
+      context.write(key, new VectorWritable(retval));
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/UnitVectorizerJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * <p>Given a DistributedRowMatrix, this job normalizes each row to unit
+ * vector length. If the input is a matrix U, and the output is a matrix
+ * W, the job follows:</p>
+ * 
+ * <p><code>v_ij = u_ij / sqrt(sum_j(u_ij * u_ij))</code></p>
+ */
+public final class UnitVectorizerJob {
+
+  public static void runJob(Path input, Path output) 
+    throws IOException, InterruptedException, ClassNotFoundException {
+    
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "UnitVectorizerJob");
+    
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(UnitVectorizerMapper.class);
+    job.setNumReduceTasks(0);
+    
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.waitForCompletion(true);
+  }
+  
+  public static class UnitVectorizerMapper
+    extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+    
+    @Override
+    protected void map(IntWritable row, VectorWritable vector, Context context) 
+      throws IOException, InterruptedException {
+      
+      // set up the return value and perform the computations
+      double norm = vectorNorm(vector.get());
+      Vector w = vector.get().assign(Functions.div(norm));
+      RandomAccessSparseVector out = new RandomAccessSparseVector(w);
+      
+      // finally write the output
+      context.write(row, new VectorWritable(out));
+    }
+    
+    /**
+     * Sums the squares of all elements together, then takes the square root
+     * of that sum.
+     * @param u
+     * @return
+     */
+    private double vectorNorm(Vector u) {
+      double retval = 0.0;
+      for (Vector.Element e : u) {;
+        retval += Functions.POW.apply(e.get(), 2);
+      }
+      return Functions.SQRT.apply(retval);
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorCache.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * This class handles reading and writing vectors to the Hadoop
+ * distributed cache. Created as a result of Eigencuts' liberal use
+ * of such functionality, but available to any algorithm requiring it.
+ */
+public final class VectorCache {
+
+  /**
+   * 
+   * @param key SequenceFile key
+   * @param vector Vector to save, to be wrapped as VectorWritable
+   * @param output
+   * @param conf
+   * @param overwritePath
+   * @param deleteOnExit
+   */
+  public static void save(Writable key, Vector vector, Path output, Configuration
+      conf, boolean overwritePath, boolean deleteOnExit) throws IOException {
+    
+    FileSystem fs = FileSystem.get(conf);
+    output = fs.makeQualified(output);
+    if (overwritePath) {
+      HadoopUtil.overwriteOutput(output);
+    }
+
+    // set the cache
+    DistributedCache.setCacheFiles(new URI[] {output.toUri()}, conf);
+    
+    // set up the writer
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, 
+        IntWritable.class, VectorWritable.class);
+    writer.append(key, new VectorWritable(vector));
+    writer.close();
+    
+    if (deleteOnExit) {
+      fs.deleteOnExit(output);
+    }
+  }
+  
+  /**
+   * Calls the save() method, setting the cache to overwrite any previous
+   * Path and to delete the path after exiting
+   * @param key
+   * @param vector
+   * @param output
+   * @param conf
+   */
+  public static void save(Writable key, Vector vector, Path output, Configuration conf)
+    throws IOException {
+    VectorCache.save(key, vector, output, conf, true, true);
+  }
+  
+  /**
+   * Loads the vector with the specified key from the cache. Returns null
+   * if nothing is found (up to the caller to handle this accordingly)
+   * 
+   * @param key
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  public static Vector load(Writable key, Configuration conf) throws IOException {
+    URI [] files = DistributedCache.getCacheFiles(conf);
+    if (files == null || files.length < 1) {
+      return null;
+    }
+    return VectorCache.load(key, conf, new Path(files[0].getPath()));
+  }
+  
+  /**
+   * Loads a Vector from the specified path
+   * 
+   * @param key
+   * @param conf
+   * @return
+   */
+  public static Vector load(Writable key, Configuration conf, Path input) 
+    throws IOException {
+
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, input, conf);
+    VectorWritable retval = new VectorWritable();
+    reader.next(key, retval);
+    reader.close();
+    return retval.get();
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VectorMatrixMultiplicationJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>This class handles the three-way multiplication of the digonal matrix
+ * and the Markov transition matrix inherent in the Eigencuts algorithm.
+ * The equation takes the form:</p>
+ * 
+ * <code>W = D^(1/2) * M * D^(1/2)</code>
+ * 
+ * <p>Since the diagonal matrix D has only n non-zero elements, it is represented
+ * as a dense vector in this job, rather than a full n-by-n matrix. This job
+ * performs the multiplications and returns the new DRM.
+ */
+public final class VectorMatrixMultiplicationJob {
+
+  /**
+   * Invokes the job.
+   * @param markovPath Path to the markov DRM's sequence files
+   * @param diag
+   * @param outputPath
+   * @return
+   */
+  public static DistributedRowMatrix runJob(Path markovPath, Vector diag, 
+      Path outputPath) throws IOException, ClassNotFoundException, InterruptedException {
+    
+    // set up the serialization of the diagonal vector
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    markovPath = fs.makeQualified(markovPath);
+    outputPath = fs.makeQualified(outputPath);
+    Path vectorOutputPath = new Path(outputPath.getParent(), "vector");
+    VectorCache.save(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), diag, vectorOutputPath, conf);
+    
+    // set up the job itself
+    Job job = new Job(conf, "VectorMatrixMultiplication");
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(VectorMatrixMultiplicationMapper.class);
+    job.setNumReduceTasks(0);
+    
+    FileInputFormat.addInputPath(job, markovPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    job.waitForCompletion(true);
+    
+    // build the resulting DRM from the results
+    return new DistributedRowMatrix(outputPath, new Path(outputPath, "tmp"), 
+        diag.size(), diag.size());
+  }
+  
+  public static class VectorMatrixMultiplicationMapper
+    extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+    
+    private Vector diagonal;
+    
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      // read in the diagonal vector from the distributed cache
+      super.setup(context);
+      Configuration config = context.getConfiguration();
+      diagonal = VectorCache.load(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), config);
+      if (diagonal == null) {
+        throw new IOException("No vector loaded from cache!");
+      }
+      if (!(diagonal instanceof DenseVector)) {
+        diagonal = new DenseVector(diagonal);
+      }
+    }
+    
+    @Override
+    protected void map(IntWritable key, VectorWritable row, Context ctx) 
+      throws IOException, InterruptedException {
+      
+      for (Vector.Element e : row.get()) {
+        double dii = Functions.SQRT.apply(diagonal.get(key.get()));
+        double djj = Functions.SQRT.apply(diagonal.get(e.index()));
+        double mij = e.get();
+        e.set(dii * mij * djj);
+      }
+      ctx.write(key, row);
+    }
+    
+    /**
+     * Performs the setup of the Mapper. Used by unit tests.
+     * @param diag
+     */
+    void setup(Vector diag) {
+      this.diagonal = diag;
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/common/VertexWritable.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Represents a vertex within the affinity graph for Eigencuts.
+ */
+public class VertexWritable implements Writable {
+  
+  /** the row */
+  private int i;
+  
+  /** the column */
+  private int j;
+  
+  /** the value at this vertex */
+  private double value;
+  
+  /** an extra type delimeter, can probably be null */
+  private String type;
+  
+  public VertexWritable() {}
+
+  public VertexWritable(int i, int j, double v, String t) {
+    this.i = i;
+    this.j = j;
+    this.value = v;
+    this.type = t;
+  }
+  
+  public int getRow() {
+    return i;
+  }
+  
+  public void setRow(int i) {
+    this.i = i;
+  }
+  
+  public int getCol() {
+    return j;
+  }
+  
+  public void setCol(int j) { 
+    this.j = j;
+  }
+  
+  public double getValue() {
+    return value;
+  }
+  
+  public void setValue(double v) {
+    this.value = v;
+  }
+  
+  public String getType() {
+    return type;
+  }
+  
+  public void setType(String t) {
+    this.type = t;
+  }
+  
+  @Override
+  public void readFields(DataInput arg0) throws IOException {
+    this.i = arg0.readInt();
+    this.j = arg0.readInt();
+    this.value = arg0.readDouble();
+    this.type = arg0.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput arg0) throws IOException {
+    arg0.writeInt(i);
+    arg0.writeInt(j);
+    arg0.writeDouble(value);
+    arg0.writeUTF(type);
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsAffinityCutsJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.common.VertexWritable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * TODO: Need a really good description of what this job does...
+ *
+ */
+public final class EigencutsAffinityCutsJob {
+  
+  static enum CUTSCOUNTER {NUM_CUTS}
+
+  /**
+   * Runs a single iteration of defining cluster boundaries, based on
+   * previous calculations and the formation of the "cut matrix".
+   * 
+   * @param currentAffinity Path to the current affinity matrix.
+   * @param cutMatrix Path to the sensitivity matrix.
+   * @param nextAffinity Output path for the new affinity matrix.
+   * @param conf
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public static long runjob(Path currentAffinity, Path cutMatrix, Path nextAffinity,
+      Configuration conf) throws IOException, ClassNotFoundException, 
+      InterruptedException {
+    
+    // these options allow us to differentiate between the two vectors
+    // in the mapper and reducer - we'll know from the working path
+    // which SequenceFile we're accessing
+    conf.set(EigencutsKeys.AFFINITY_PATH, currentAffinity.getName());
+    conf.set(EigencutsKeys.CUTMATRIX_PATH, cutMatrix.getName());
+    
+    Job job = new Job(conf, "EigencutsAffinityCutsJob");
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(VertexWritable.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setMapperClass(EigencutsAffinityCutsMapper.class);
+    job.setCombinerClass(EigencutsAffinityCutsCombiner.class);
+    job.setReducerClass(EigencutsAffinityCutsReducer.class);
+    
+    //FileInputFormat.addInputPath(job, currentAffinity);
+    FileInputFormat.addInputPath(job, cutMatrix);
+    FileOutputFormat.setOutputPath(job, nextAffinity);
+    
+    job.waitForCompletion(true);
+    
+    return job.getCounters().findCounter(CUTSCOUNTER.NUM_CUTS).getValue();
+  }
+  
+  public static class EigencutsAffinityCutsMapper
+    extends Mapper<IntWritable, VectorWritable, Text, VertexWritable> {
+    
+    @Override
+    protected void map(IntWritable key, VectorWritable row, Context context) 
+      throws IOException, InterruptedException {
+      
+      // all this method does is construct a bunch of vertices, mapping those
+      // together which have the same *combination* of indices; for example,
+      // (1, 3) will have the same key as (3, 1) but a different key from (1, 1)
+      // and (3, 3) (which, incidentally, will also not be grouped together)
+      String type = context.getWorkingDirectory().getName();
+      Vector vector = row.get();
+      for (Vector.Element e : vector) {
+        String newkey = Math.max(key.get(), e.index()) + "_" + Math.min(key.get(), e.index());
+        context.write(new Text(newkey), new VertexWritable(key.get(), e.index(), e.get(), type));
+      }
+    }
+  }
+  
+  public static class EigencutsAffinityCutsCombiner
+    extends Reducer<Text, VertexWritable, Text, VertexWritable> {
+    
+    @Override
+    protected void reduce(Text t, Iterable<VertexWritable> vertices, 
+        Context context) throws IOException, InterruptedException {
+      // there should be exactly 4 items in the iterable; two from the
+      // first Path source, and two from the second with matching (i, j) indices
+      
+      // the idea here is that we want the two vertices of the "cut" matrix,
+      // and if either of them has a non-zero value, we want to:
+      //
+      // 1) zero out the two affinity vertices, and 
+      // 2) add their former values to the (i, i) and (j, j) coordinates
+      //
+      // though obviously we want to perform these steps in reverse order
+      boolean zero = false;
+      int i = -1, j = -1;
+      double k = 0;
+      Configuration conf = context.getConfiguration();
+      int count = 0;
+      System.out.println("DEBUG: " + t.toString());
+      for (VertexWritable v : vertices) {
+        count++;
+        if (v.getType().equals(conf.get(EigencutsKeys.AFFINITY_PATH))) {
+          i = v.getRow();
+          j = v.getCol();
+          k = v.getValue();
+        } else if (v.getValue() != 0.0) {
+          zero = true;
+        }
+      }
+      // if there are only two vertices, we have a diagonal
+      // we want to preserve whatever is currently in the diagonal,
+      // since this is acting as a running sum of all other values
+      // that have been "cut" so far - simply return this element as is
+      if (count == 2) {
+        VertexWritable vw = new VertexWritable(i, j, k, "unimportant");
+        context.write(new Text(String.valueOf(i)), vw);
+        return;
+      }
+      
+      // do we zero out the values?
+      VertexWritable out_i = new VertexWritable();
+      VertexWritable out_j = new VertexWritable();
+      if (zero) {
+        // increment the cut counter
+        context.getCounter(CUTSCOUNTER.NUM_CUTS).increment(1);
+        
+        // we want the values to exist on the diagonal
+        out_i.setCol(i);
+        out_j.setCol(j);
+        
+        // also, set the old values to zero
+        VertexWritable zero_i = new VertexWritable();
+        VertexWritable zero_j = new VertexWritable();
+        zero_i.setCol(j);
+        zero_i.setValue(0);
+        zero_j.setCol(i);
+        zero_j.setValue(0);
+        zero_i.setType("unimportant");
+        zero_j.setType("unimportant");
+        context.write(new Text(String.valueOf(i)), zero_i);
+        context.write(new Text(String.valueOf(j)), zero_j);
+      } else {
+        out_i.setCol(j);
+        out_j.setCol(i);
+      }
+      
+      // set the values and write them
+      out_i.setValue(k);
+      out_j.setValue(k);
+      out_i.setType("unimportant");
+      out_j.setType("unimportant");
+      context.write(new Text(String.valueOf(i)), out_i);
+      context.write(new Text(String.valueOf(j)), out_j);
+    }
+  }
+  
+  public static class EigencutsAffinityCutsReducer 
+    extends Reducer<Text, VertexWritable, IntWritable, VectorWritable> {
+    
+    @Override
+    protected void reduce(Text row, Iterable<VertexWritable> entries, 
+        Context context) throws IOException, InterruptedException {
+      // now to assemble the vectors
+      RandomAccessSparseVector output = new RandomAccessSparseVector(
+          context.getConfiguration().getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+      int rownum = Integer.parseInt(row.toString());
+      for (VertexWritable e : entries) {
+        // first, are we setting a diagonal?
+        if (e.getCol() == rownum) {
+          // add to what's already present
+          output.setQuick(e.getCol(), output.getQuick(e.getCol()) + e.getValue());
+        } else {
+          // simply set the value
+          output.setQuick(e.getCol(), e.getValue());
+        }
+      }
+      context.write(new IntWritable(rownum), new VectorWritable(output));
+    }
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsDriver.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.common.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
+import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.stats.OnlineSummarizer;
+
+public class EigencutsDriver extends AbstractJob {
+  
+  public static final double EPSILON_DEFAULT = 0.25;
+  public static final double TAU_DEFAULT = -0.1;
+  public static final double OVERSHOOT_MULTIPLIER = 1.5;
+  
+  // parameters of interest
+  /** number of dimensions in the square affinity matrix */
+  private int dimensions;
+  /** user-supplied minimum half-life threshold */
+  private double halflife;
+  /** user-supplied coefficient for setting minimum half-life threshold */
+  private double epsilon;
+  /** user-supplied threshold for cutting links in the affinity graph */
+  private double tau;
+
+  public static void main(String args[]) throws Exception {
+    ToolRunner.run(new EigencutsDriver(), args);
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+    
+    // set up command line arguments
+    Configuration conf = new Configuration();
+    addOption("input", "i", "Path to input affinity matrix data", true);
+    addOption("output", "o", "Output of clusterings", true);
+    addOption("half-life", "b", "Minimal half-life threshold", true);
+    addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+    addOption("epsilon", "e", "Half-life threshold coefficient", Double.toString(EigencutsDriver.EPSILON_DEFAULT));
+    addOption("tau", "t", "Threshold for cutting affinities", Double.toString(EigencutsDriver.TAU_DEFAULT));
+    Map<String, String> parsedArgs = parseArguments(arg0);
+    if (parsedArgs == null) {
+      return 0;
+    }
+    
+    // read in the command line values
+    Path input = new Path(parsedArgs.get("--input"));
+    Path output = new Path(parsedArgs.get("--output"));
+    dimensions = Integer.parseInt(parsedArgs.get("--dimensions"));
+    halflife = Integer.parseInt(parsedArgs.get("--half-life"));
+    epsilon = Double.parseDouble(parsedArgs.get("--epsilon"));
+    tau = Double.parseDouble(parsedArgs.get("--tau"));
+    
+    // create a few new Paths for temp files and transformations
+    Path outputCalc = new Path(output, "calculations");
+    Path outputTmp = new Path(output, "temporary");
+    
+    DistributedRowMatrix A = AffinityMatrixInputJob.runJob(input, outputCalc, dimensions);
+    Vector D = MatrixDiagonalizeJob.runJob(A.getRowPath(), dimensions);
+    
+    long numCuts = 0;
+    do {
+      // first three steps are the same as spectral k-means:
+      // 1) calculate D from A
+      // 2) calculate L = D^-0.5 * A * D^-0.5
+      // 3) calculate eigenvectors of L
+
+      DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(A.getRowPath(), 
+        D, new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)));
+      L.configure(new JobConf(conf));
+    
+      // eigendecomposition (step 3)
+      int overshoot = (int)((double)dimensions * 
+        EigencutsDriver.OVERSHOOT_MULTIPLIER);
+      List<Double> eigenValues = new ArrayList<Double>(overshoot);
+      Matrix eigenVectors = new DenseMatrix(overshoot, dimensions);
+      DistributedRowMatrix U = performEigenDecomposition(L, 
+        dimensions, overshoot, eigenValues, eigenVectors, 
+        outputCalc);
+      U.configure(new JobConf(conf));
+      eigenValues = eigenValues.subList(0, dimensions);
+    
+      // here's where things get interesting: steps 4, 5, and 6 are unique
+      // to this algorithm, and depending on the final output, steps 1-3
+      // may be repeated as well
+    
+      // helper method, since apparently List and Vector objects don't play nicely
+      Vector evs = listToVector(eigenValues);
+    
+      // calculate sensitivities (step 4 and step 5)
+      Path sensitivities = new Path(outputCalc, "sensitivities-" + (System.nanoTime() & 0xFF));
+      EigencutsSensitivityJob.runJob(evs, D, U.getRowPath(), halflife, 
+        tau, median(D), epsilon, sensitivities);
+    
+      // perform the cuts (step 6)
+      input = new Path(outputTmp, "nextAff-" + (System.nanoTime() & 0xFF));
+      numCuts = EigencutsAffinityCutsJob.runjob(A.getRowPath(), sensitivities, input, conf);
+    
+      // how many cuts were made?
+      if (numCuts > 0) {
+        // recalculate A
+        A = new DistributedRowMatrix(input, new Path(outputTmp, Long.toString(System.nanoTime())),
+            dimensions, dimensions);
+        A.configure(new JobConf());
+      }
+    } while (numCuts > 0);
+    
+    // TODO: output format???
+    
+    return 0;
+  }
+  
+  /**
+   * Does most of the heavy lifting in setting up Paths, configuring return
+   * values, and generally performing the tedious administrative tasks involved
+   * in an eigen-decomposition and running the verifier
+   * @param input
+   * @param numEigenVectors
+   * @param overshoot
+   * @param eigenValues
+   * @param eigenVectors
+   * @param tmp
+   * @return
+   */
+  public static DistributedRowMatrix performEigenDecomposition(
+      DistributedRowMatrix input, int numEigenVectors, int overshoot, 
+      List<Double> eigenValues, Matrix eigenVectors, Path tmp) 
+      throws IOException {
+    DistributedLanczosSolver solver = new DistributedLanczosSolver();
+    Path seqFiles = new Path(tmp, "eigendecomp-" + (System.nanoTime() & 0xFF));
+    solver.runJob(new Configuration(), input.getRowPath(), new Path(tmp, 
+        "lanczos-" + (System.nanoTime() & 0xFF)), input.numRows(), 
+        input.numCols(), true, overshoot, eigenVectors, eigenValues, 
+        seqFiles.toString());
+    
+    // now run the verifier to trim down the number of eigenvectors
+    EigenVerificationJob verifier = new EigenVerificationJob();
+    Path verifiedEigens = new Path(tmp, "verifiedeigens");
+    verifier.runJob(seqFiles, input.getRowPath(), verifiedEigens, 
+        false, 1.0, 0.0, numEigenVectors);
+    Path cleanedEigens = verifier.getCleanedEigensPath();
+    return new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, 
+        "tmp"), numEigenVectors, input.numRows());
+  }
+  
+  /**
+   * A quick and dirty hack to compute the median of a vector...
+   * @param v
+   * @return
+   */
+  private double median(Vector v) {
+    OnlineSummarizer med = new OnlineSummarizer();
+    if (v.size() < 100) {
+      return v.zSum() / v.size();
+    }
+    for (Vector.Element e : v) {
+      med.add(e.get());
+    }
+    return med.getMedian();
+  }
+  
+  /**
+   * Iteratively loops through the list, converting it to a Vector of double
+   * primitives worthy of other Mahout operations
+   * @param list
+   * @return
+   */
+  private Vector listToVector(List<Double> list) {
+    Vector retval = new DenseVector(list.size());
+    int index = 0;
+    for (Double d : list) {
+      retval.setQuick(index++, d.doubleValue());
+    }
+    return retval;
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsKeys.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+/**
+ * Configuration keys for the Eigencuts algorithm (analogous to KMeansConfigKeys)
+ */
+public interface EigencutsKeys {
+
+	/**
+	 * B_0, or the user-specified minimum eigenflow half-life threshold
+	 * for an eigenvector/eigenvalue pair to be considered. Increasing
+	 * B_0 equates to fewer clusters
+	 */
+	String BETA = "org.apache.mahout.clustering.spectral.beta";
+	
+	/**
+	 * Tau, or the user-specified threshold for making cuts (setting edge
+	 * affinities to 0) after performing non-maximal suppression on edge weight
+	 * sensitivies. Increasing tau equates to more edge cuts
+	 */
+	String TAU = "org.apache.mahout.clustering.spectral.tau";
+	
+	/**
+	 * The normalization factor for computing the cut threshold
+	 */
+	String DELTA = "org.apache.mahout.clustering.spectral.delta";
+	
+	/**
+	 * Epsilon, or the user-specified coefficient that works in tandem with 
+	 * MINIMUM_HALF_LIFE to determine which eigenvector/eigenvalue pairs to use.
+	 * Increasing epsilon equates to fewer eigenvector/eigenvalue pairs
+	 */
+	String EPSILON = "org.apache.mahout.clustering.spectral.epsilon";
+	
+	/**
+	 * Base path to the location on HDFS where the diagonal matrix (a vector)
+	 * and the list of eigenvalues will be stored for one of the map/reduce 
+	 * jobs in Eigencuts.
+	 */
+	String VECTOR_CACHE_BASE = "org.apache.mahout.clustering.spectral.eigencuts.vectorcache";
+	
+	/**
+	 * Refers to the dimensions of the raw affinity matrix input. Since this
+	 * matrix is symmetrical, it is a square matrix, hence all its dimensions
+	 * are equal.
+	 */
+	String AFFINITY_DIMENSIONS = "org.apache.mahout.clustering.spectral.eigencuts.affinitydimensions";
+	
+	/**
+	 * Refers to the Path to the SequenceFile representing the affinity matrix
+	 */
+	String AFFINITY_PATH = "org.apache.mahout.clustering.spectral.eigencuts.affinitypath";
+	
+	/**
+	 * Refers to the Path to the SequenceFile representing the cut matrix
+	 */
+	String CUTMATRIX_PATH = "org.apache.mahout.clustering.spectral.eigencuts.cutmatrixpath";
+	
+	/**
+     * Sets the SequenceFile index for the list of eigenvalues.
+     */
+    final int EIGENVALUES_CACHE_INDEX = 0;
+	
+	/**
+	 * Sets the SequenceFile index for the diagonal matrix. 
+	 */
+	final int DIAGONAL_CACHE_INDEX = 1;
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityJob.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.spectral.common.VectorCache;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <p>There are a quite a few operations bundled within this mapper. Gather 'round
+ * and listen, all of ye.</p>
+ * 
+ * <p>The input to this job is eight items:</p>
+ * <ol><li>B<sub>0</sub>, which is a command-line parameter fed through the Configuration object</li>
+ * <li>diagonal matrix, a constant vector fed through the Hadoop cache</li>
+ * <li>list of eigenvalues, a constant vector fed through the Hadoop cache</li>
+ * <li>eigenvector, the input value to the mapper</li>
+ * <li>epsilon</li>
+ * <li>delta</li>
+ * <li>tau</li>
+ * <li>output, the Path to the output matrix of sensitivities</li></ol>
+ * 
+ * <p>The first three items are constant and are used in all of the map
+ * tasks. The row index indicates which eigenvalue from the list to use, and
+ * also serves as the output identifier. The diagonal matrix and the 
+ * eigenvector are both of equal length and are iterated through twice
+ * within each map task, unfortunately lending each task to a runtime of 
+ * n<sup>2</sup>. This is unavoidable.</p>
+ * 
+ * <p>For each (i, j) combination of elements within the eigenvector, a complex
+ * equation is run that explicitly computes the sensitivity to perturbation of 
+ * the flow of probability within the specific edge of the graph. Each
+ * sensitivity, as it is computed, is simultaneously applied to a non-maximal
+ * suppression step: for a given sensitivity S_ij, it must be suppressed if
+ * any other S_in or S_mj has a more negative value. Thus, only the most
+ * negative S_ij within its row i or its column j is stored in the return
+ * array, leading to an output (per eigenvector!) with maximum length n, 
+ * minimum length 1.</p>
+ * 
+ * <p>Overall, this creates an n-by-n (possibly sparse) matrix with a maximum
+ * of n^2 non-zero elements, minimum of n non-zero elements.</p>
+ */
+public final class EigencutsSensitivityJob {
+
+  /**
+   * Initializes the configuration tasks, loads the needed data into
+   * the HDFS cache, and executes the job.
+   * 
+   * @param eigenvalues Vector of eigenvalues
+   * @param diagonal Vector representing the diagonal matrix
+   * @param eigenvectors Path to the DRM of eigenvectors
+   * @param beta
+   * @param tau
+   * @param delta
+   * @param epsilon
+   * @param output Path to the output matrix (will have between n and full-rank
+   *                non-zero elements)
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public static void runJob(Vector eigenvalues, Vector diagonal, Path 
+      eigenvectors, double beta, double tau, double delta, double epsilon, Path output) 
+      throws IOException, ClassNotFoundException, InterruptedException {
+    
+    // save the two vectors to the distributed cache
+    Configuration jobConfig = new Configuration();
+    Path eigenOutputPath = new Path(output.getParent(), "eigenvalues");
+    Path diagOutputPath = new Path(output.getParent(), "diagonal");
+    jobConfig.set(EigencutsKeys.VECTOR_CACHE_BASE, output.getParent().getName());
+    VectorCache.save(new IntWritable(EigencutsKeys.EIGENVALUES_CACHE_INDEX), 
+        eigenvalues, eigenOutputPath, jobConfig);
+    VectorCache.save(new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), 
+        diagonal, diagOutputPath, jobConfig);
+    
+    // set up the rest of the job
+    jobConfig.set(EigencutsKeys.BETA, Double.toString(beta));
+    jobConfig.set(EigencutsKeys.EPSILON, Double.toString(epsilon));
+    jobConfig.set(EigencutsKeys.DELTA, Double.toString(delta));
+    jobConfig.set(EigencutsKeys.TAU, Double.toString(tau));
+    
+    Job job = new Job(jobConfig, "EigencutsSensitivityJob");
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(EigencutsSensitivityNode.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(EigencutsSensitivityMapper.class);
+    job.setReducerClass(EigencutsSensitivityReducer.class);
+    
+    FileInputFormat.addInputPath(job, eigenvectors);
+    FileOutputFormat.setOutputPath(job, output);
+    
+    job.waitForCompletion(true);
+  }  
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityMapper.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.spectral.common.VectorCache;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+public class EigencutsSensitivityMapper extends
+    Mapper<IntWritable, VectorWritable, IntWritable, EigencutsSensitivityNode> {
+
+  private Vector eigenvalues;
+  private Vector diagonal;
+  private double beta0;
+  private double epsilon;
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration config = context.getConfiguration();
+    beta0 = Double.parseDouble(config.get(EigencutsKeys.BETA));
+    epsilon = Double.parseDouble(config.get(EigencutsKeys.EPSILON));
+    
+    // read in the two vectors from the cache
+    eigenvalues = VectorCache.load(
+        new IntWritable(EigencutsKeys.EIGENVALUES_CACHE_INDEX), config);
+    diagonal = VectorCache.load(
+        new IntWritable(EigencutsKeys.DIAGONAL_CACHE_INDEX), config);
+    if (!(eigenvalues instanceof SequentialAccessSparseVector || eigenvalues instanceof DenseVector)) {
+      eigenvalues = new SequentialAccessSparseVector(eigenvalues);
+    }
+    if (!(diagonal instanceof SequentialAccessSparseVector || diagonal instanceof DenseVector)) {
+      diagonal = new SequentialAccessSparseVector(diagonal);
+    }
+  }
+  
+  @Override
+  protected void map(IntWritable row, VectorWritable vw, Context context) 
+    throws IOException, InterruptedException {
+    
+    // first, does this particular eigenvector even pass the required threshold?
+    double eigenvalue = Math.abs(eigenvalues.get(row.get()));
+    double betak = -Functions.LOGARITHM.apply(2) / Functions.LOGARITHM.apply(eigenvalue);
+    if (eigenvalue >= 1.0 || betak <= (epsilon * beta0)) {
+      // doesn't pass the threshold! quit
+      return;
+    }
+    
+    // go through the vector, performing the calculations
+    // sadly, no way to get around n^2 computations      
+    Map<Integer, EigencutsSensitivityNode> columns = new HashMap<Integer, EigencutsSensitivityNode>();
+    Vector ev = vw.get();
+    for (int i = 0; i < ev.size(); i++) {
+      double minS_ij = Double.MAX_VALUE;
+      int minInd = -1;
+      for (int j = 0; j < ev.size(); j++) {          
+        double S_ij = performSensitivityCalculation(eigenvalue, ev.get(i), 
+            ev.get(j), diagonal.get(i), diagonal.get(j));
+        
+        // perform non-maximal suppression
+        // is this the smallest value in the row?
+        if (S_ij < minS_ij) {
+          minS_ij = S_ij;
+          minInd = j;
+        }
+      }
+      
+      // is this the smallest value in the column?
+      Integer column = new Integer(minInd);
+      EigencutsSensitivityNode value = new EigencutsSensitivityNode(i, minInd, minS_ij);
+      if (!columns.containsKey(column)) {
+        columns.put(column, value);
+      } else if (columns.get(column).getSensitivity() > minS_ij) {
+        columns.remove(column);
+        columns.put(column, value);
+      }
+    }
+    
+    // write whatever values made it through
+    
+    for (EigencutsSensitivityNode e : columns.values().toArray(new EigencutsSensitivityNode[0])) {
+      context.write(new IntWritable(e.getRow()), e);
+    }
+  }
+  
+  /**
+   * Helper method, performs the actual calculation. Looks something like this:
+   *
+   * (log(2) / lambda_k * log(lambda_k) * log(lambda_k^beta0 / 2)) * [
+   * - (((u_i / sqrt(d_i)) - (u_j / sqrt(d_j)))^2 + (1 - lambda) * 
+   *   ((u_i^2 / d_i) + (u_j^2 / d_j))) ]
+   * 
+   * @param eigenvalue
+   * @param ev_i
+   * @param ev_j
+   * @param diag_i
+   * @param diag_j
+   * @return
+   */
+  private double performSensitivityCalculation(double eigenvalue, double
+      ev_i, double ev_j, double diag_i, double diag_j) {
+    
+    double firsthalf = Functions.LOGARITHM.apply(2) / (
+        eigenvalue * Functions.LOGARITHM.apply(eigenvalue) * 
+        Functions.LOGARITHM.apply(Functions.POW.apply(eigenvalue, beta0) / 2));
+    
+    double secondhalf = -Functions.POW.apply(((ev_i / 
+        Functions.SQRT.apply(diag_i)) - (ev_j / 
+        Functions.SQRT.apply(diag_j))), 2) + (1 - eigenvalue) * 
+        ((Functions.POW.apply(ev_i, 2) / diag_i) + 
+        (Functions.POW.apply(ev_j, 2) / diag_j));
+    
+    return firsthalf * secondhalf;
+  }
+  
+  /**
+   * Utility helper method, used for unit testing.
+   * @param beta0
+   * @param epsilon
+   * @param eigenvalues
+   * @param diagonal
+   */
+  void setup(double beta0, double epsilon, Vector eigenvalues, Vector diagonal) {
+    this.beta0 = beta0;
+    this.epsilon = epsilon;
+    this.eigenvalues = eigenvalues;
+    this.diagonal = diagonal;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityNode.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class allows the storage of computed sensitivities in an
+ * unordered fashion, instead having each sensitivity track its
+ * own (i, j) coordinate. Thus these objects can be stored as elements
+ * in any list or, in particular, Writable array.
+ */
+public class EigencutsSensitivityNode implements Writable {
+  
+  private int row;
+  private int column;
+  private double sensitivity;
+  
+  public EigencutsSensitivityNode(int i, int j, double s) {
+    row = i;
+    column = j;
+    sensitivity = s;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.row = in.readInt();
+    this.column = in.readInt();
+    this.sensitivity = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(row);
+    out.writeInt(column);
+    out.writeDouble(sensitivity);
+  }
+
+  public int getRow() {
+    return row;
+  }
+
+  public int getColumn() {
+    return column;
+  }
+
+  public double getSensitivity() {
+    return sensitivity;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/eigencuts/EigencutsSensitivityReducer.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.eigencuts;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <p>The point of this class is to take all the arrays of sensitivities
+ * and convert them to a single matrix. Since there may be many values
+ * that, according to their (i, j) coordinates, overlap in the matrix,
+ * the "winner" will be determined by whichever value is smaller.</p> 
+ */
+public class EigencutsSensitivityReducer extends
+    Reducer<IntWritable, EigencutsSensitivityNode, IntWritable, VectorWritable> {
+
+  @Override
+  protected void reduce(IntWritable key, Iterable<EigencutsSensitivityNode> arr, 
+      Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Vector v = new RandomAccessSparseVector(conf
+      .getInt(EigencutsKeys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE), 100);
+    double threshold = Double.parseDouble(conf.get(EigencutsKeys.TAU)) / 
+      Double.parseDouble(conf.get(EigencutsKeys.DELTA));
+    
+    for (EigencutsSensitivityNode n : arr) {
+      if (n.getSensitivity() < threshold && n.getSensitivity() < v.getQuick(n.getColumn())) {
+        v.setQuick(n.getColumn(), n.getSensitivity());
+      }
+    }
+    context.write(key, new VectorWritable(v));
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1003188&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java Thu Sep 30 18:00:23 2010
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.clustering.spectral.common.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.common.UnitVectorizerJob;
+import org.apache.mahout.clustering.spectral.common.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
+import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+
+/**
+ * Implementation of the EigenCuts spectral clustering algorithm.
+ */
+public class SpectralKMeansDriver extends AbstractJob {
+
+  public static final boolean DEBUG = false;
+
+  public static final double OVERSHOOT_MULTIPLIER = 2.0;
+
+  public static void main(String args[]) throws Exception {
+    ToolRunner.run(new SpectralKMeansDriver(), args);
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+    // set up command line options
+    Configuration conf = new Configuration();
+    addOption("input", "i", "Path to input affinity matrix data", true);
+    addOption("output", "o", "Output of clusterings", true);
+    addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+    addOption("clusters", "k", "Number of clusters and top eigenvectors", true);
+    Map<String, String> parsedArgs = parseArguments(arg0);
+    if (parsedArgs == null) {
+      return 0;
+    }
+
+    // TODO: Need to be able to read all k-means parameters, though
+    // they will be optional parameters to the algorithm
+    // read the values of the command line
+    Path input = new Path(parsedArgs.get("--input"));
+    Path output = new Path(parsedArgs.get("--output"));
+    int numDims = Integer.parseInt(parsedArgs.get("--dimensions"));
+    int clusters = Integer.parseInt(parsedArgs.get("--clusters"));
+
+    // create a few new Paths for temp files and transformations
+    Path outputCalc = new Path(output, "calculations");
+    Path outputTmp = new Path(output, "temporary");
+
+    // Take in the raw CSV text file and split it ourselves,
+    // creating our own SequenceFiles for the matrices to read later 
+    // (similar to the style of syntheticcontrol.canopy.InputMapper)
+    Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() & 0xFF));
+    AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+
+    // Next step: construct the affinity matrix using the newly-created
+    // sequence files
+    DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
+                                                      new Path(outputTmp, "afftmp-" + (System.nanoTime() & 0xFF)),
+                                                      numDims,
+                                                      numDims);
+    JobConf depConf = new JobConf(conf);
+    A.configure(depConf);
+
+    // Next step: construct the diagonal matrix D (represented as a vector)
+    // and calculate the normalized Laplacian of the form:
+    // L = D^(-0.5)AD^(-0.5)
+    Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+    DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian-"
+        + (System.nanoTime() & 0xFF)));
+    L.configure(new JobConf(conf));
+
+    // Next step: perform eigen-decomposition using LanczosSolver
+    // since some of the eigen-output is spurious and will be eliminated
+    // upon verification, we have to aim to overshoot and then discard
+    // unnecessary vectors later
+    int overshoot = (int) ((double) clusters * SpectralKMeansDriver.OVERSHOOT_MULTIPLIER);
+    List<Double> eigenValues = new ArrayList<Double>(overshoot);
+    Matrix eigenVectors = new DenseMatrix(overshoot, numDims);
+    DistributedLanczosSolver solver = new DistributedLanczosSolver();
+    Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" + (System.nanoTime() & 0xFF));
+    solver.runJob(conf,
+                  L.getRowPath(),
+                  new Path(outputTmp, "lanczos-" + (System.nanoTime() & 0xFF)),
+                  L.numRows(),
+                  L.numCols(),
+                  true,
+                  overshoot,
+                  eigenVectors,
+                  eigenValues,
+                  lanczosSeqFiles.toString());
+
+    // perform a verification
+    EigenVerificationJob verifier = new EigenVerificationJob();
+    Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
+    verifier.runJob(lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, 0.0, clusters);
+    Path cleanedEigens = verifier.getCleanedEigensPath();
+    DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, numDims);
+    W.configure(new JobConf());
+    DistributedRowMatrix Wtrans = W.transpose();
+    //		DistributedRowMatrix Wt = W.transpose();
+
+    // next step: normalize the rows of Wt to unit length
+    Path unitVectors = new Path(outputCalc, "unitvectors-" + (System.nanoTime() & 0xFF));
+    UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
+    DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims);
+    Wt.configure(new JobConf());
+
+    //		Iterator<MatrixSlice> i = W.iterator();
+    //		int x = 0;
+    //		while (i.hasNext()) {
+    //			Vector v = i.next().vector();
+    //			System.out.println("EIGENVECTOR " + (++x));
+    //			for (int c = 0; c < v.size(); c++) {
+    //				System.out.print(v.get(c) + " ");
+    //			}
+    //			System.out.println();
+    //		}
+    //		System.exit(0);
+
+    // Finally, perform k-means clustering on the rows of L (or W)
+    // generate random initial clusters
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Path initialclusters = RandomSeedGenerator.buildRandom(Wt.getRowPath(),
+                                                           new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
+                                                           clusters,
+                                                           measure);
+    KMeansDriver.run(new Configuration(), Wt.getRowPath(), initialclusters, output, measure, 0.001, 10, true, false);
+
+    // Read through the cluster assignments
+    Path clusteredPointsPath = new Path(output, "clusteredPoints");
+    FileSystem fs = FileSystem.get(conf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(clusteredPointsPath, "part-m-00000"), conf);
+    // The key is the clusterId
+    IntWritable clusterId = new IntWritable(0);
+    // The value is the weighted vector
+    WeightedVectorWritable value = new WeightedVectorWritable();
+
+    //	    Map<Integer, Integer> map = new HashMap<Integer, Integer>();
+    int id = 0;
+    while (reader.next(clusterId, value)) {
+      //	    	Integer key = new Integer(clusterId.get());
+      //	    	if (map.containsKey(key)) {
+      //	    		Integer count = map.remove(key);
+      //	    		map.put(key, new Integer(count.intValue() + 1));
+      //	    	} else {
+      //	    		map.put(key, new Integer(1));
+      //	    	}
+      System.out.println((id++) + ": " + clusterId.get());
+      clusterId = new IntWritable(0);
+      value = new WeightedVectorWritable();
+    }
+    reader.close();
+
+    // TODO: output format???
+
+    return 0;
+  }
+}



Re: svn commit: r1003188 [1/2] - in /mahout/trunk/core/src: main/java/org/apache/mahout/clustering/spectral/ main/java/org/apache/mahout/clustering/spectral/common/ main/java/org/apache/mahout/clustering/spectral/eigencuts/ main/java/org/apache/mahout/clus...

Posted by Jeff Eastman <jd...@windwardsolutions.com>.
  Woops! I saw the second email and not the first and jumped the gun. 
Personally, I think there is value in getting more eyeballs on this code 
to help with the output, find bugs, etc. I'm certainly willing to help 
out. It has a lot of functionality in place and is much easier to handle 
with small patches going forward than one whopper. We have had 
work-in-progress commits before and some things in 0.4 will still be 
experimental or unpolished.

Jeff