You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/07/09 09:09:58 UTC

svn commit: r792423 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/

Author: edwardyoon
Date: Thu Jul  9 07:09:57 2009
New Revision: 792423

URL: http://svn.apache.org/viewvc?rev=792423&view=rev
Log:
Finds the eigenvalues and eigenvectors associated with the symmetric matrix A

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java
    incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jul  9 07:09:57 2009
@@ -3,7 +3,8 @@
 Trunk (unreleased changes)
 
   NEW FEATURES
-  
+    HAMA-185: Finds the eigenvalues and eigenvectors 
+              associated with the symmetric matrix A (samuel)
     HAMA-171: Find the maximum absolute row sum using MapReduce (edwardyoon)
     HAMA-174: Compute the transpose of a matrix (edwardyoon)
     HAMA-162: Add Graph using Sparse Matrix (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Thu Jul  9 07:09:57 2009
@@ -51,6 +51,7 @@
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.algebra.JacobiEigenValue;
 import org.apache.hama.algebra.MatrixNormMapRed;
 import org.apache.hama.algebra.TransposeMap;
 import org.apache.hama.algebra.TransposeReduce;
@@ -146,6 +147,16 @@
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
           .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
           Integer.MAX_VALUE, HConstants.FOREVER, false));
+      // the following families are used in JacobiEigenValue computation
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EI), 1, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EICOL), 10, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EIVEC), 10, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
 
       LOG.info("Initializing the matrix storage.");
       this.admin.createTable(this.tableDesc);

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Thu Jul  9 07:09:57 2009
@@ -33,26 +33,34 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hama.algebra.BlockMultiplyMap;
 import org.apache.hama.algebra.BlockMultiplyReduce;
 import org.apache.hama.algebra.DenseMatrixVectorMultMap;
 import org.apache.hama.algebra.DenseMatrixVectorMultReduce;
+import org.apache.hama.algebra.JacobiEigenValue;
 import org.apache.hama.algebra.RowCyclicAdditionMap;
 import org.apache.hama.algebra.RowCyclicAdditionReduce;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.mapred.CollectBlocksMapper;
+import org.apache.hama.mapred.DummyMapper;
 import org.apache.hama.mapred.RandomMatrixMap;
 import org.apache.hama.mapred.RandomMatrixReduce;
 import org.apache.hama.mapred.VectorInputFormat;
@@ -666,4 +674,243 @@
 
     JobManager.execute(jobConf);
   }
+  
+  /**
+   * Compute all the eigen values.
+   * Note: all the eigen values are collected in the "eival:value" column,
+   * and the eigen vector of a specified eigen value is collected in the
+   * "eivec:" column family in the same row.
+   * 
+   * TODO: we may need to expose the interface to access the eigen values
+   * and vectors
+   * 
+   * @param loops limit the loops of the computation
+   * @throws IOException
+   */
+  public void jacobiEigenValue(int loops) throws IOException {
+    JobConf jobConf = new JobConf(config);
+    
+    /******************************************************************
+     * Initialization
+     * 
+     * A M/R job is used for initialization(such as, preparing a matrx
+     * copy of the original in "eicol:" family.)
+     ******************************************************************/
+    // initialization
+    jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+    
+    jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
+    jobConf.setInputFormat(VectorInputFormat.class);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    
+    FileInputFormat.addInputPaths(jobConf, getPath());
+    jobConf.set(JacobiEigenValue.MATRIX, getPath());
+    jobConf.setOutputFormat(NullOutputFormat.class);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(MapWritable.class);
+    
+    JobManager.execute(jobConf);
+    
+    final FileSystem fs = FileSystem.get(jobConf);
+    Pair pivotPair = new Pair();
+    DoubleWritable pivotWritable = new DoubleWritable();
+    VectorUpdate vu ;
+    
+    // loop
+    int size = this.getRows();
+    int state = size;
+    int pivot_row, pivot_col;
+    double pivot;
+    double s, c, t, y;
+    
+    while(state != 0 && loops > 0) {
+      /******************************************************************
+       * Find the pivot and its index(pivot_row, pivot_col) 
+       * 
+       * A M/R job is used to scan all the "eival:ind" to get the max 
+       * absolute value of each row, and do a MAX aggregation of these
+       * max values to get the max value in the matrix.
+       ******************************************************************/
+      jobConf = new JobConf(config);
+      jobConf.setJobName("Find Pivot MR job" + getPath());
+      
+      jobConf.setNumReduceTasks(1);
+      
+      Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_" + System.currentTimeMillis()), "out");
+      if(fs.exists(outDir)) 
+        fs.delete(outDir, true);
+      
+      jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
+      jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
+      jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setMapOutputKeyClass(Pair.class);
+      jobConf.setMapOutputValueClass(DoubleWritable.class);
+      
+      jobConf.setOutputKeyClass(Pair.class);
+      jobConf.setOutputValueClass(DoubleWritable.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      FileOutputFormat.setOutputPath(jobConf, outDir);
+      
+      // update the out put dir of the job
+      outDir = FileOutputFormat.getOutputPath(jobConf);
+      
+      JobManager.execute(jobConf);
+
+      //read outputs
+      Path inFile = new Path(outDir, "part-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+      try {
+        reader.next(pivotPair, pivotWritable);
+        pivot_row = pivotPair.getRow();
+        pivot_col = pivotPair.getColumn();
+        pivot = pivotWritable.get();
+      } finally {
+        reader.close();
+      }
+      fs.delete(outDir.getParent(), true);
+      
+      /******************************************************************
+       * Calculation
+       * 
+       * Compute the rotation parameters of next rotation.
+       ******************************************************************/
+      double e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row), 
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      double e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col), 
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      
+      y = (e2 - e1) / 2;
+      t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
+      s = Math.sqrt(pivot * pivot + t * t);
+      c = t / s;
+      s = pivot / s;
+      t = (pivot * pivot) / t;
+      if(y < 0) {
+        s = -s;
+        t = -t;
+      }
+      
+      /******************************************************************
+       * Upate the pivot and the eigen values indexed by the pivot
+       ******************************************************************/
+      vu = new VectorUpdate(pivot_row);
+      vu.put(JacobiEigenValue.EICOL, pivot_col, 0);
+      table.commit(vu.getBatchUpdate());
+      
+      state = update(pivot_row, -t, state);
+      state = update(pivot_col, t, state);
+      
+      /******************************************************************
+       * Rotation the matrix
+       ******************************************************************/
+      // rotation
+      jobConf = new JobConf(config);
+      jobConf.setJobName("Rotation Matrix MR job" + getPath());
+      
+      jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
+      jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
+      jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
+      jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
+      
+      jobConf.setMapperClass(DummyMapper.class);
+      jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);     
+      jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setMapOutputKeyClass(NullWritable.class);
+      jobConf.setMapOutputValueClass(NullWritable.class);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setOutputFormat(NullOutputFormat.class);
+      
+      JobManager.execute(jobConf);
+      
+      // rotate eigenvectors
+      LOG.info("rotating eigenvector");
+      for(int i = 0; i < size; i++) {
+        e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row), 
+            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+        e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col), 
+            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+        
+        vu = new VectorUpdate(pivot_row);
+        vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2);
+        table.commit(vu.getBatchUpdate());
+        
+        vu = new VectorUpdate(pivot_col);
+        vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2);
+        table.commit(vu.getBatchUpdate());
+      }
+      
+      LOG.info("update index...");
+      // update index array
+      maxind(pivot_row, size);
+      maxind(pivot_col, size);
+      
+      loops --;
+    }
+  }
+  
+  void maxind(int row, int size) throws IOException {
+    int m = row + 1;
+    if(row + 2 < size) {
+      double max = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row), 
+          Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue());
+      double val;
+      for(int i=row + 2; i<size; i++) {
+        val = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row), 
+            Bytes.toBytes(JacobiEigenValue.EICOL + i)).getValue());
+        if(Math.abs(val) > Math.abs(max)) {
+          m = i;
+          max = val;
+        }
+      }
+    }
+    
+    VectorUpdate vu = new VectorUpdate(row);
+    vu.put(JacobiEigenValue.EIIND, m);
+    table.commit(vu.getBatchUpdate());
+  }
+  
+  int update(int row, double value, int state) throws IOException {
+    double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row), 
+        Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+    int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row), 
+        Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue());
+    double y = e;
+    e += value;
+    
+    VectorUpdate vu = new VectorUpdate(row);
+    vu.put(JacobiEigenValue.EIVAL, e);
+    if(changed == 1 && (Math.abs(y - e) < .0000001)) { //y == e) {
+      changed = 0;
+      vu.put(JacobiEigenValue.EICHANGED, changed);
+      state --;
+    } else if(changed == 0 && (Math.abs(y - e) > .0000001)) {
+      changed = 1;
+      vu.put(JacobiEigenValue.EICHANGED, changed);
+      state ++;
+    } 
+    table.commit(vu.getBatchUpdate());
+    return state;
+  }
+  
+  // for test
+  boolean verifyEigenValue(double[] e, double[][] E) throws IOException {
+    boolean success = true;
+    double e1, ev;
+    for(int i=0; i<e.length; i++) {
+      e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i), 
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      success &= ((Math.abs(e1 - e[i]) < .0000001));
+      if(!success) return success;
+      
+      for(int j=0; j<E[i].length; j++) {
+        ev = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i), 
+            Bytes.toBytes(JacobiEigenValue.EIVEC + j)).getValue());
+        success &= ((Math.abs(ev - E[i][j]) < .0000001));
+        if(!success) return success;
+      }
+    }
+    return success;
+  }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java Thu Jul  9 07:09:57 2009
@@ -0,0 +1,583 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.HTableInputFormatBase;
+import org.apache.hama.mapred.HTableRecordReaderBase;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * A catalog class collect all the m/r classes to compute the matrix's eigen
+ * values
+ */
+public class JacobiEigenValue {
+
+  /** a matrix copy of the original copy collected in "eicol" family * */
+  public static final String EICOL = "eicol:";
+  /** a column family collect all values and statuses used during computation * */
+  public static final String EI = "eival:";
+  /** a column collect all the eigen values * */
+  public static final String EIVAL = EI + "value";
+  /** a column identify whether the eigen values have been changed * */
+  public static final String EICHANGED = EI + "changed";
+  /** a column identify the index of the max absolute value each row * */
+  public static final String EIIND = EI + "ind";
+  /** a matrix collect all the eigen vectors * */
+  public static final String EIVEC = "eivec:";
+  public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+  /** parameters for pivot * */
+  public static final String PIVOTROW = "hama.jacobi.pivot.row";
+  public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+  public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+  public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
+
+  static final Log LOG = LogFactory.getLog(JacobiEigenValue.class);
+
+  /**
+   * The matrix will be modified during computing eigen value. So a new matrix
+   * will be created to prevent the original matrix being modified. To reduce
+   * the network transfer, we copy the "column" family in the original matrix to
+   * a "eicol" family. All the following modification will be done over "eicol"
+   * family.
+   * 
+   * And the output Eigen Vector Arrays "eivec", and the output eigen value
+   * array "eival:value", and the temp status array "eival:changed", "eival:ind"
+   * will be created.
+   * 
+   * Also "eival:state" will record the state of the rotation state of a matrix
+   */
+  public static class InitMapper extends MapReduceBase implements
+      Mapper<IntWritable, MapWritable, NullWritable, NullWritable> {
+
+    HTable table;
+
+    @Override
+    public void configure(JobConf job) {
+      String tableName = job.get(MATRIX, "");
+      try {
+        table = new HTable(new HBaseConfiguration(job), tableName);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void map(IntWritable key, MapWritable value,
+        OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
+        throws IOException {
+      int row, col;
+      row = key.get();
+      VectorUpdate vu = new VectorUpdate(row);
+
+      double val;
+      double maxVal = Double.MIN_VALUE;
+      int maxInd = row + 1;
+
+      boolean init = true;
+      for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+        val = ((DoubleEntry) e.getValue()).getValue();
+        col = ((IntWritable) e.getKey()).get();
+        // copy the original matrix to "EICOL" family
+        vu.put(JacobiEigenValue.EICOL, col, val);
+        // make the "EIVEC" a dialog matrix
+        vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0);
+        if (col == row) {
+          vu.put(JacobiEigenValue.EIVAL, val);
+        }
+        // find the max index
+        if (col > row) {
+          if (init) {
+            maxInd = col;
+            maxVal = val;
+            init = false;
+          } else {
+            if (Math.abs(val) > Math.abs(maxVal)) {
+              maxVal = val;
+              maxInd = col;
+            }
+          }
+        }
+      }
+      // index array
+      vu.put(JacobiEigenValue.EIIND, maxInd);
+      // Changed Array set to be true during initialization
+      vu.put(JacobiEigenValue.EICHANGED, 1);
+
+      table.commit(vu.getBatchUpdate());
+    }
+
+  }
+
+  /**
+   * PivotInputFormat & PivotMapper & PivotReducer are used to find the pivot in
+   * a matrix
+   */
+  public static class PivotInputFormat extends HTableInputFormatBase implements
+      InputFormat<Pair, DoubleWritable>, JobConfigurable {
+
+    private PivotRecordReader tableRecordReader;
+
+    protected static class PivotRecordReader extends HTableRecordReaderBase
+        implements RecordReader<Pair, DoubleWritable> {
+
+      private int totalRows;
+      private int processedRows;
+      private int size;
+      boolean mocked = true;
+
+      @Override
+      public void init() throws IOException {
+        super.init();
+
+        Cell rows = null;
+        rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+        size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+        if (endRow.length == 0) { // the last split, we don't know the end row
+          totalRows = 0; // so we just skip it.
+        } else {
+          if (startRow.length == 0) { // the first split, start row is 0
+            totalRows = BytesUtil.bytesToInt(endRow);
+          } else {
+            totalRows = BytesUtil.bytesToInt(endRow)
+                - BytesUtil.bytesToInt(startRow);
+          }
+        }
+        processedRows = 0;
+        LOG.info("Split (" + Bytes.toString(startRow) + ", "
+            + Bytes.toString(endRow) + ") -> " + totalRows);
+      }
+
+      /**
+       * @return Pair
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createKey()
+       */
+      public Pair createKey() {
+        return new Pair();
+      }
+
+      /**
+       * @return DoubleWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createValue()
+       */
+      public DoubleWritable createValue() {
+        return new DoubleWritable();
+      }
+
+      /**
+       * @param key Pair as input key.
+       * @param value DoubleWritable as input value
+       * 
+       * Converts Scanner.next() to Pair, DoubleWritable
+       * 
+       * @return true if there was more data
+       * @throws IOException
+       */
+      public boolean next(Pair key, DoubleWritable value) throws IOException {
+        RowResult result;
+        try {
+          result = this.scanner.next();
+        } catch (UnknownScannerException e) {
+          LOG.debug("recovered from " + StringUtils.stringifyException(e));
+          restart(lastRow);
+          this.scanner.next(); // skip presumed already mapped row
+          result = this.scanner.next();
+        }
+
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+          byte[] row = result.getRow();
+          int rowId = BytesUtil.bytesToInt(row);
+          if (rowId == size - 1) { // skip the last row
+            if (mocked) {
+              key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+              mocked = false;
+              return true;
+            } else {
+              return false;
+            }
+          }
+
+          byte[] col = result.get(EIIND).getValue();
+          int colId = BytesUtil.bytesToInt(col);
+          double val = 0;
+
+          // get (rowId, colId)'s value
+          Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes
+              .toBytes(EICOL + colId));
+          if (cell != null && cell.getValue() != null) {
+            val = BytesUtil.bytesToDouble(cell.getValue());
+          }
+
+          key.set(rowId, colId);
+          value.set(val);
+
+          lastRow = row;
+          processedRows++;
+        } else {
+          if (mocked) {
+            key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+            mocked = false;
+            return true;
+          } else {
+            return false;
+          }
+        }
+        return hasMore;
+      }
+
+      @Override
+      public float getProgress() {
+        if (totalRows <= 0) {
+          return 0;
+        } else {
+          return Math.min(1.0f, processedRows / (float) totalRows);
+        }
+      }
+
+    }
+
+    @Override
+    public RecordReader<Pair, DoubleWritable> getRecordReader(InputSplit split,
+        JobConf conf, Reporter reporter) throws IOException {
+      TableSplit tSplit = (TableSplit) split;
+      PivotRecordReader trr = this.tableRecordReader;
+      // if no table record reader was provided use default
+      if (trr == null) {
+        trr = new PivotRecordReader();
+      }
+      trr.setStartRow(tSplit.getStartRow());
+      trr.setEndRow(tSplit.getEndRow());
+      trr.setHTable(this.table);
+      trr.setInputColumns(this.inputColumns);
+      trr.setRowFilter(this.rowFilter);
+      trr.init();
+      return trr;
+    }
+
+    protected void setTableRecordReader(PivotRecordReader tableRecordReader) {
+      this.tableRecordReader = tableRecordReader;
+    }
+
+  }
+
+  // find the pivot of the matrix
+  public static class PivotMapper extends MapReduceBase implements
+      Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+    private double max = 0;
+    private Pair pair = new Pair(0, 0);
+    private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+    private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+    @Override
+    public void map(Pair key, DoubleWritable value,
+        OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+        throws IOException {
+      if (key.getRow() != Integer.MAX_VALUE) {
+        if (Math.abs(value.get()) > Math.abs(max)) {
+          pair.set(key.getRow(), key.getColumn());
+          max = value.get();
+        }
+      } else {
+        collector.collect(pair, new DoubleWritable(max));
+        collector.collect(dummyPair, dummyVal);
+      }
+    }
+
+  }
+
+  public static class PivotReducer extends MapReduceBase implements
+      Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+    private double max = 0;
+    private Pair pair = new Pair(0, 0);
+
+    @Override
+    public void reduce(Pair key, Iterator<DoubleWritable> values,
+        OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+        throws IOException {
+      double val;
+      if (key.getRow() != Integer.MAX_VALUE) {
+        val = values.next().get();
+        if (Math.abs(val) > Math.abs(max)) {
+          pair.set(key.getRow(), key.getColumn());
+          max = val;
+        }
+      } else {
+        collector.collect(pair, new DoubleWritable(max));
+      }
+    }
+
+  }
+
+  /**
+   * Tricky here! we rotation the matrix during we scan the matrix and update to
+   * the matrix so we just need a rotationrecordreader to scan the matrix and do
+   * the rotation the mapper&reducer just a dummy mapper
+   */
+  public static class RotationInputFormat extends HTableInputFormatBase
+      implements InputFormat<NullWritable, NullWritable>, JobConfigurable {
+
+    private RotationRecordReader tableRecordReader;
+
+    int pivot_row, pivot_col;
+    double pivot_cos, pivot_sin;
+
+    public void configure(JobConf job) {
+      super.configure(job);
+      pivot_row = job.getInt(PIVOTROW, -1);
+      pivot_col = job.getInt(PIVOTCOL, -1);
+      pivot_sin = Double.parseDouble(job.get(PIVOTSIN));
+      pivot_cos = Double.parseDouble(job.get(PIVOTCOS));
+    }
+
+    protected static class RotationRecordReader extends HTableRecordReaderBase
+        implements RecordReader<NullWritable, NullWritable> {
+
+      private int totalRows;
+      private int processedRows;
+      int startRowId, endRowId = -1;
+      int size;
+
+      int pivotrow, pivotcol;
+      byte[] prow, pcol;
+      double pivotcos, pivotsin;
+
+      public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+        super();
+        pivotrow = pr;
+        pivotcol = pc;
+        pivotsin = psin;
+        pivotcos = pcos;
+        prow = Bytes.toBytes(pivotrow);
+        pcol = Bytes.toBytes(pivotcol);
+        LOG.info(prow);
+        LOG.info(pcol);
+      }
+
+      @Override
+      public void init() throws IOException {
+        super.init();
+
+        Cell rows = null;
+        rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+        size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+        if (endRow.length == 0) { // the last split, we don't know the end row
+          totalRows = 0; // so we just skip it.
+          if (startRow.length == 0)
+            startRowId = 0;
+          else
+            startRowId = BytesUtil.bytesToInt(startRow);
+          endRowId = -1;
+        } else {
+          if (startRow.length == 0) { // the first split, start row is 0
+            totalRows = BytesUtil.bytesToInt(endRow);
+            startRowId = 0;
+            endRowId = totalRows;
+          } else {
+            startRowId = BytesUtil.bytesToInt(startRow);
+            endRowId = BytesUtil.bytesToInt(endRow);
+            totalRows = startRowId - endRowId;
+          }
+        }
+        processedRows = 0;
+        LOG
+            .info("Split (" + startRowId + ", " + endRowId + ") -> "
+                + totalRows);
+      }
+
+      /**
+       * @return NullWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createKey()
+       */
+      public NullWritable createKey() {
+        return NullWritable.get();
+      }
+
+      /**
+       * @return NullWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createValue()
+       */
+      public NullWritable createValue() {
+        return NullWritable.get();
+      }
+
+      /**
+       * @param key NullWritable as input key.
+       * @param value NullWritable as input value
+       * 
+       * Converts Scanner.next() to NullWritable, NullWritable
+       * 
+       * @return true if there was more data
+       * @throws IOException
+       */
+      public boolean next(NullWritable key, NullWritable value)
+          throws IOException {
+        RowResult result;
+        try {
+          result = this.scanner.next();
+        } catch (UnknownScannerException e) {
+          LOG.debug("recovered from " + StringUtils.stringifyException(e));
+          restart(lastRow);
+          this.scanner.next(); // skip presumed already mapped row
+          result = this.scanner.next();
+        }
+
+        double s1, s2;
+        VectorUpdate bu;
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+          byte[] row = result.getRow();
+          int rowId = BytesUtil.bytesToInt(row);
+          if (rowId < pivotrow) {
+            s1 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId),
+                Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue());
+            s2 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId),
+                Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue());
+
+            bu = new VectorUpdate(rowId);
+            bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2);
+            bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+
+            htable.commit(bu.getBatchUpdate());
+          } else if (rowId == pivotrow) {
+            return true;
+          } else if (rowId < pivotcol) {
+            s1 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + rowId))
+                .getValue());
+            s2 = BytesUtil.bytesToDouble(htable.get(
+                BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + pivotcol))
+                .getValue());
+
+            bu = new VectorUpdate(rowId);
+            bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+            htable.commit(bu.getBatchUpdate());
+
+            bu = new VectorUpdate(pivotrow);
+            bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2);
+            htable.commit(bu.getBatchUpdate());
+          } else if (rowId == pivotcol) {
+            for (int i = pivotcol + 1; i < size; i++) {
+              s1 = BytesUtil.bytesToDouble(htable.get(
+                  BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i))
+                  .getValue());
+              s2 = BytesUtil.bytesToDouble(htable.get(
+                  BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i))
+                  .getValue());
+
+              bu = new VectorUpdate(pivotcol);
+              bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2);
+              htable.commit(bu.getBatchUpdate());
+
+              bu = new VectorUpdate(pivotrow);
+              bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2);
+              htable.commit(bu.getBatchUpdate());
+            }
+          } else { // rowId > pivotcol
+            return false;
+          }
+
+          lastRow = row;
+          processedRows++;
+        }
+        return hasMore;
+      }
+
+      @Override
+      public float getProgress() {
+        if (totalRows <= 0) {
+          return 0;
+        } else {
+          return Math.min(1.0f, processedRows / (float) totalRows);
+        }
+      }
+
+    }
+
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+        throws IOException {
+      InputSplit[] splits = super.getSplits(job, numSplits);
+      List<InputSplit> newSplits = new ArrayList<InputSplit>();
+      for (InputSplit split : splits) {
+        TableSplit ts = (TableSplit) split;
+        byte[] row = ts.getStartRow();
+        if (row.length == 0) // the first split
+          newSplits.add(split);
+        else {
+          if (BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) {
+            newSplits.add(split);
+          }
+        }
+      }
+
+      return newSplits.toArray(new InputSplit[newSplits.size()]);
+    }
+
+    @Override
+    public RecordReader<NullWritable, NullWritable> getRecordReader(
+        InputSplit split, JobConf conf, Reporter reporter) throws IOException {
+      TableSplit tSplit = (TableSplit) split;
+      RotationRecordReader trr = this.tableRecordReader;
+      // if no table record reader was provided use default
+      if (trr == null) {
+        trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin,
+            pivot_cos);
+      }
+      trr.setStartRow(tSplit.getStartRow());
+      trr.setEndRow(tSplit.getEndRow());
+      trr.setHTable(this.table);
+      trr.setInputColumns(this.inputColumns);
+      trr.setRowFilter(this.rowFilter);
+      trr.init();
+      return trr;
+    }
+
+    protected void setTableRecordReader(RotationRecordReader tableRecordReader) {
+      this.tableRecordReader = tableRecordReader;
+    }
+
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java Thu Jul  9 07:09:57 2009
@@ -0,0 +1,65 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/** A Pair stands for (row, column) pair **/
+public class Pair implements WritableComparable<Pair> {
+
+  int row, col;
+  
+  public Pair() {}
+  
+  public Pair(int row_, int col_) {
+    set(row_, col_);
+  }
+  
+  public int getRow() { return row; }
+  public int getColumn() { return col; }
+  
+  public void setRow(int row_) { row = row_; }
+  public void setColumn(int col_) { col = col_; }
+  public void set(int row_, int col_) {
+    row = row_;
+    col = col_;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    row = in.readInt();
+    col = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(row);
+    out.writeInt(col);
+  }
+
+  @Override
+  public int compareTo(Pair p) {
+    return row == p.row ? col - p.col : row - p.row;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    Pair pair = (Pair)obj;
+    return compareTo(pair) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return row;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append('(').append(row).append(',').append(col).append(')');
+    return sb.toString();
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Thu Jul  9 07:09:57 2009
@@ -50,6 +50,20 @@
     this.batchUpdate.put(BytesUtil.getColumnIndex(j), BytesUtil
         .doubleToBytes(value));
   }
+  
+  /**
+   * Put the value in "cfName+j"
+   * @param cfName
+   * @param j
+   * @param value
+   */
+  public void put(String cfName, int j, double value) {
+    this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value));
+  }
+  
+  public void put(String name, double value) {
+    this.batchUpdate.put(Bytes.toBytes(name), Bytes.toBytes(value));
+  }
 
   public void put(int j, String name) {
     this.batchUpdate.put(Bytes.toBytes((Constants.ATTRIBUTE + j)), Bytes

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java Thu Jul  9 07:09:57 2009
@@ -0,0 +1,20 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/** Implements the dummy function, mapping inputs directly to outputs. */
+public class DummyMapper<K, V>
+    extends MapReduceBase implements Mapper<K, V, K, V> {
+
+  /** The dummy function. */
+  public void map(K key, V val,
+                  OutputCollector<K, V> output, Reporter reporter)
+    throws IOException {
+    // do nothing
+  }
+}
\ No newline at end of file

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Thu Jul  9 07:09:57 2009
@@ -43,7 +43,7 @@
   private static Matrix m1;
   private static Matrix m2;
   private static Matrix m3;
-  private static Matrix m4;
+  private static Matrix m4, m5;
   private final static String aliase1 = "matrix_aliase_A";
   private final static String aliase2 = "matrix_aliase_B";
   private static HamaConfiguration conf;
@@ -64,6 +64,7 @@
         m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
         m3 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
         m4 = DenseMatrix.random(hCluster.getConf(), SIZE-2, SIZE-2);
+        m5 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
       }
 
       protected void tearDown() {
@@ -309,7 +310,106 @@
       i++;
     }
   }
-  
+
+  public void testJacobiEigenValue() throws IOException {
+    // copy Matrix m5 to the array
+    double[][] S = new double[SIZE][SIZE];
+    
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        S[i][j] = m5.get(i, j);
+      }
+    }
+    
+    // do m/r jacobi eigen value computation
+    DenseMatrix dm = (DenseMatrix)m5;
+    dm.jacobiEigenValue(3);
+    
+    // do jacobi egien value over S array
+    int i, j, k, l, m, state;
+    double s, c, t, p, y;
+    double e1, e2;
+    // index array
+    int[] ind = new int[SIZE];
+    boolean[] changed = new boolean[SIZE];
+    
+    // output
+    double[] e = new double[SIZE];
+    double[][] E = new double[SIZE][SIZE];
+    
+    // init e & E; ind & changed
+    for(i=0; i<SIZE; i++) {
+      for(j=0; j<SIZE; j++) {
+        E[i][j] = 0;
+      }
+      E[i][i] = 1;
+    }
+    
+    state = SIZE;
+    
+    for(i=0; i<SIZE; i++) {
+      ind[i] = maxind(S, i, SIZE); 
+      e[i] = S[i][i];
+      changed[i] = true;
+    }
+    
+    int loops = 3;
+    // next rotation
+    while(state != 0 && loops > 0) {
+      // find index(k, l) for pivot p
+      m = 0;
+      for(k = 1; k <= SIZE-2; k++) {
+        if(Math.abs(S[m][ind[m]]) < Math.abs(S[k][ind[k]])) {
+          m = k;
+        }
+      }
+      
+      k = m; l = ind[m]; p = S[k][l];
+      
+      // calculate c = cos, s = sin
+      y = (e[l] - e[k]) / 2;
+      t = Math.abs(y) + Math.sqrt(p * p + y * y);
+      s = Math.sqrt(p * p + t * t);
+      c = t / s;
+      s = p / s;
+      t = (p * p) / t;
+      if(y < 0) {
+        s = -s;
+        t = -t;
+      }
+      
+      S[k][l] = 0.0;
+      state = update(e, changed, k, -t, state);
+      state = update(e, changed, l, t, state);
+      
+      for(i = 0; i <= k-1; i++) 
+        rotate(S, i, k, i, l, c, s);
+      
+      for(i = l+1; i < SIZE; i++)
+        rotate(S, k, i, l, i, c, s);
+      
+      for(i = k+1; i <= l-1; i++)
+        rotate(S, k, i, i, l, c, s);
+      
+      // rotate eigenvectors
+      for(i = 0; i < SIZE; i++) {
+        e1 = E[k][i];
+        e2 = E[l][i];
+        
+        E[k][i] = c * e1 - s * e2;
+        E[l][i] = s * e1 + c * e2;
+      }
+      
+      ind[k] = maxind(S, k, SIZE);
+      ind[l] = maxind(S, l, SIZE);
+      
+      loops --;
+    }
+    
+    // verify the results
+    assertTrue(dm.verifyEigenValue(e, E));
+  }
+
   public void testEnsureForAddition() {
     try {
       m1.add(m4);
@@ -432,10 +532,41 @@
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        double gap = (c[i][j] - result.get(i, j));
-        assertTrue(gap < 0.000001 && gap > -0.000001);
+        assertTrue((Math.abs(c[i][j] - result.get(i, j)) < .0000001));
       }
     }
   }
+  
+ 
+  //index of largest off-diagonal element in row k
+  int maxind(double[][] S, int row, int size) {
+    int m = row + 1;
+    for(int i=row + 2; i<size; i++) {
+      if(Math.abs(S[row][i]) > Math.abs(S[row][m]))
+        m = i;
+    }
+    return m;
+  }
+  
+  int update(double[] e, boolean[] changed, int row, double value, int state) {
+    double y = e[row];
+    e[row] += value;
+    
+    if(changed[row] && y == e[row]) {
+      changed[row] = false;
+      return state - 1;
+    } else if(!changed[row] && y != e[row]) {
+      changed[row] = true;
+      return state + 1;
+    } else
+      return state;
+  }
+  
+  void rotate(double[][] S, int k, int l, int i, int j, double c, double s) {
+    double s1 = S[k][l], s2 = S[i][j];
+    S[k][l] = c * s1 - s * s2;
+    S[i][j] = s * s1 + c * s2;
+  }
+
 
 }