You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/10/29 07:13:52 UTC

svn commit: r830849 - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/graph/ java/org/apache/hama/io/ java/org/apache/hama/mapred/ java/org/apache/hama/mapreduce/ java/org/apache/hama/matrix/ java/org/apache/hama/matrix/algebra...

Author: edwardyoon
Date: Thu Oct 29 06:13:51 2009
New Revision: 830849

URL: http://svn.apache.org/viewvc?rev=830849&view=rev
Log:
Replacement of JacobiEigenValueMap/Reduce

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/package.html
      - copied unchanged from r829698, incubator/hama/trunk/src/java/org/apache/hama/mapred/package.html
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
    incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
    incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java
    incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Oct 29 06:13:51 2009
@@ -58,9 +58,6 @@
   public final static String ALIASEFAMILY = "aliase";
   
   /** Default columnFamily name */
-  @Deprecated
-  public final static String COLUMN = "column:";
-
   public static byte[] COLUMNFAMILY = Bytes.toBytes("column");
   
   /** Temporary random matrices name prefix */
@@ -85,10 +82,31 @@
   public static final int DEFAULT_TRY_TIMES = 10000000;
   
   /** block data column */
-  @Deprecated
-  public static final String BLOCK = "block:";
-  
-  public static final String BLOCK_FAMILY = "block";
+  public static final String BLOCK = "block";
   
   public static final Text ROWCOUNT= new Text("row");
+
+  /**
+   * EigenValue Constants
+   */
+  /** a matrix copy of the original copy collected in "eicol" family * */
+  public static final String EICOL = "eicol";
+
+  /** a column family collect all values and statuses used during computation * */
+  public static final String EI = "eival";
+  public static final String EIVAL = "value";
+  public static final String EICHANGED = "changed";
+
+  /** a column identify the index of the max absolute value each row * */
+  public static final String EIIND = "ind";
+
+  /** a matrix collect all the eigen vectors * */
+  public static final String EIVEC = "eivec";
+  public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+
+  /** parameters for pivot * */
+  public static final String PIVOTROW = "hama.jacobi.pivot.row";
+  public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+  public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+  public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java Thu Oct 29 06:13:51 2009
@@ -121,7 +121,7 @@
     StringBuilder result = new StringBuilder();
 
     try {
-      scan = table.getScanner(new String[] { Constants.COLUMN }, "");
+      scan = table.getScanner(new String[] { "column:" }, "");
       Iterator<RowResult> it = scan.iterator();
       while (it.hasNext()) {
         RowResult rs = it.next();

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Thu Oct 29 06:13:51 2009
@@ -24,7 +24,6 @@
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -33,58 +32,29 @@
 import org.apache.hama.util.BytesUtil;
 
 public class VectorUpdate {
-  private BatchUpdate batchUpdate;
   private Put put;
 
   public VectorUpdate(int i) {
-    this.batchUpdate = new BatchUpdate(BytesUtil.getRowIndex(i));
     this.put = new Put(BytesUtil.getRowIndex(i));
   }
 
   public VectorUpdate(String row) {
-    this.batchUpdate = new BatchUpdate(row);
     this.put = new Put(Bytes.toBytes(row));
   }
 
   public VectorUpdate(byte[] row) {
-    this.batchUpdate = new BatchUpdate(row);
     this.put = new Put(row);
   }
 
   public void put(int j, double value) {
-    this.batchUpdate.put(BytesUtil.getColumnIndex(j), BytesUtil
-        .doubleToBytes(value));
     this.put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j)),
         BytesUtil.doubleToBytes(value));
   }
 
-  /**
-   * Put the value in "cfName+j"
-   * 
-   * @param cfName
-   * @param j
-   * @param value
-   */
   public void put(String cfName, int j, double value) {
-    this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value));
     this.put.add(Bytes.toBytes(cfName), Bytes.toBytes(String.valueOf(j)), Bytes.toBytes(value));
   }
 
-  public void put(String name, double value) {
-    this.batchUpdate.put(Bytes.toBytes(name), Bytes.toBytes(value));
-  }
-
-  @Deprecated
-  public void put(int j, String name) {
-    this.batchUpdate.put(Bytes
-        .toBytes((Bytes.toString(Constants.ATTRIBUTE) + j)), Bytes
-        .toBytes(name));
-  }
-
-  public void put(String j, String val) {
-    this.batchUpdate.put(j, Bytes.toBytes(val));
-  }
-
   public void put(String column, String qualifier, String val) {
     this.put.add(Bytes.toBytes(column), Bytes.toBytes(qualifier), Bytes
         .toBytes(val));
@@ -94,14 +64,6 @@
     this.put.add(Bytes.toBytes(column), Bytes.toBytes(qualifier), Bytes
         .toBytes(val));
   }
-  
-  public void put(String row, int val) {
-    this.batchUpdate.put(row, BytesUtil.intToBytes(val));
-  }
-
-  public BatchUpdate getBatchUpdate() {
-    return this.batchUpdate;
-  }
 
   public void putAll(Map<Integer, Double> buffer) {
     for (Map.Entry<Integer, Double> f : buffer.entrySet()) {

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java Thu Oct 29 06:13:51 2009
@@ -68,7 +68,7 @@
         int seq = (x * mBlockNum) + key.getColumn() + r;
         BlockID bkID = new BlockID(key.getRow(), x, seq);
         Put put = new Put(bkID.getBytes());
-        put.add(Bytes.toBytes(Constants.BLOCK_FAMILY), 
+        put.add(Bytes.toBytes(Constants.BLOCK), 
             Bytes.toBytes("a"), 
             subMatrix.getBytes());
         context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
@@ -79,7 +79,7 @@
             + key.getRow();
         BlockID bkID = new BlockID(x, key.getColumn(), seq);
         Put put = new Put(bkID.getBytes());
-        put.add(Bytes.toBytes(Constants.BLOCK_FAMILY), 
+        put.add(Bytes.toBytes(Constants.BLOCK), 
             Bytes.toBytes("b"), 
             subMatrix.getBytes());
         context.write(new ImmutableBytesWritable(bkID.getBytes()), put);

Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,15 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class DummyMapper<K, V> extends Mapper<K, V, K, V> {
+  /** The dummy function. */
+  public void map(K key, V val, OutputCollector<K, V> output, Reporter reporter)
+      throws IOException {
+    // do nothing
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,286 @@
+package org.apache.hama.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.Pair;
+import org.apache.hama.util.BytesUtil;
+
+public class PivotInputFormat extends InputFormat<Pair, DoubleWritable>
+    implements Configurable {
+  final static Log LOG = LogFactory.getLog(PivotInputFormat.class);
+
+  /** Job parameter that specifies the output table. */
+  public static final String INPUT_TABLE = "hama.mapreduce.inputtable";
+  /** Space delimited list of columns. */
+  public static final String SCAN = "hama.mapreduce.scan";
+  
+  /** The configuration. */
+  private Configuration conf = null;
+  
+  /** Holds the details for the internal scanner. */
+  private Scan scan = null;
+  /** The table to scan. */
+  private HTable table = null;
+  /** The reader scanning the table, can be a custom one. */
+  private PivotRecordReader pivotRecordReader = null;
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    if (table == null) {
+      throw new IOException("No table was provided.");
+    }
+    byte[][] startKeys = table.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region.");
+    }
+    int realNumSplits = startKeys.length;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos])
+          .getServerAddress().getHostname();
+      splits[i] = new TableSplit(this.table.getTableName(),
+          startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+              : HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return Arrays.asList(splits);
+  }
+
+  protected static class PivotRecordReader extends
+      RecordReader<Pair, DoubleWritable> {
+    private int totalRows;
+    private int processedRows;
+    private int size;
+    boolean mocked = true;
+
+    private ResultScanner scanner = null;
+    private Scan scan = null;
+    private HTable htable = null;
+    private byte[] lastRow = null;
+    private Pair key = null;
+    private DoubleWritable value = null;
+
+    @Override
+    public void close() {
+      this.scanner.close();
+    }
+
+    public void setScan(Scan scan) {
+      this.scan = scan;
+    }
+
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    public void init() throws IOException {
+      restart(scan.getStartRow());
+    }
+
+    public void restart(byte[] firstRow) throws IOException {
+      Scan newScan = new Scan(scan);
+      newScan.setStartRow(firstRow);
+      this.scanner = this.htable.getScanner(newScan);
+    }
+
+    @Override
+    public Pair getCurrentKey() throws IOException, InterruptedException {
+      return key;
+    }
+
+    @Override
+    public DoubleWritable getCurrentValue() throws IOException,
+        InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      if (totalRows <= 0) {
+        return 0;
+      } else {
+        return Math.min(1.0f, processedRows / (float) totalRows);
+      }
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (key == null)
+        key = new Pair();
+      if (value == null)
+        value = new DoubleWritable();
+
+      Result vv;
+      try {
+        vv = this.scanner.next();
+      } catch (IOException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        restart(lastRow);
+        scanner.next(); // skip presumed already mapped row
+        vv = scanner.next();
+      }
+
+      boolean hasMore = vv != null && vv.size() > 0;
+      if (hasMore) {
+
+        byte[] row = vv.getRow();
+
+        int rowId = BytesUtil.bytesToInt(row);
+        if (rowId == size - 1) { // skip the last row
+          if (mocked) {
+            key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+            mocked = false;
+            return true;
+          } else {
+            return false;
+          }
+        }
+
+        byte[] col = vv.getValue(Bytes
+            .toBytes(Constants.EI), Bytes
+            .toBytes(Constants.EIIND));
+        int colId = BytesUtil.bytesToInt(col);
+        double val = 0;
+
+        Get get = new Get(BytesUtil.getRowIndex(rowId));
+        byte[] cell = htable.get(get).getValue(
+            Bytes.toBytes(Constants.EICOL),
+            Bytes.toBytes(String.valueOf(colId)));
+        if (cell != null) {
+          val = BytesUtil.bytesToDouble(cell);
+        }
+
+        key.set(rowId, colId);
+        value.set(val);
+
+        lastRow = row;
+      } else {
+        if (mocked) {
+          key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+          mocked = false;
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      return hasMore;
+    }
+  }
+
+  @Override
+  public RecordReader<Pair, DoubleWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    PivotRecordReader trr = this.pivotRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new PivotRecordReader();
+    }
+    Scan sc = new Scan(this.scan);
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    trr.setScan(sc);
+    trr.setHTable(table);
+    trr.init();
+    return trr;
+  }
+
+  protected HTable getHTable() {
+    return this.table;
+  }
+
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  public Scan getScan() {
+    if (this.scan == null)
+      this.scan = new Scan();
+    return scan;
+  }
+
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  protected void setTableRecordReader(PivotRecordReader pivotRecordReader) {
+    this.pivotRecordReader = pivotRecordReader;
+  }
+  
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    String tableName = conf.get(INPUT_TABLE);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(conf), tableName));
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+    Scan scan = null;
+    try {
+      scan = convertStringToScan(conf.get(SCAN));
+    } catch (IOException e) {
+      LOG.error("An error occurred.", e);
+    }
+    setScan(scan);
+  }
+  
+  public static String convertScanToString(Scan scan) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();  
+    DataOutputStream dos = new DataOutputStream(out);
+    scan.write(dos);
+    return Base64.encodeBytes(out.toByteArray());
+  }
+  
+  public static Scan convertStringToScan(String base64) throws IOException {
+    ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
+    DataInputStream dis = new DataInputStream(bis);
+    Scan scan = new Scan();
+    scan.readFields(dis);
+    return scan;
+  }
+  
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,346 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+
+public class RotationInputFormat extends
+    InputFormat<NullWritable, NullWritable> implements Configurable {
+  final static Log LOG = LogFactory.getLog(RotationInputFormat.class);
+  /** Job parameter that specifies the output table. */
+  public static final String INPUT_TABLE = "hama.mapreduce.inputtable";
+  /** Space delimited list of columns. */
+  public static final String SCAN = "hama.mapreduce.scan";
+
+  /** The configuration. */
+  private Configuration conf = null;
+
+  /** Holds the details for the internal scanner. */
+  private Scan scan = null;
+  /** The table to scan. */
+  private HTable table = null;
+  /** The reader scanning the table, can be a custom one. */
+  private RotationRecordReader rotationRecordReader;
+
+  int pivot_row, pivot_col;
+  double pivot_cos, pivot_sin;
+
+  protected static class RotationRecordReader extends
+      RecordReader<NullWritable, NullWritable> {
+    private ResultScanner scanner = null;
+    private Scan scan = null;
+    private HTable htable = null;
+    private byte[] lastRow = null;
+
+    private int totalRows;
+    private int processedRows;
+    int startRowId, endRowId = -1;
+    int size;
+
+    int pivotrow, pivotcol;
+    byte[] prow, pcol;
+    double pivotcos, pivotsin;
+
+    public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+      super();
+      pivotrow = pr;
+      pivotcol = pc;
+      pivotsin = psin;
+      pivotcos = pcos;
+      prow = Bytes.toBytes(pivotrow);
+      pcol = Bytes.toBytes(pivotcol);
+      LOG.info(prow);
+      LOG.info(pcol);
+    }
+
+    public void setScan(Scan scan) {
+      this.scan = scan;
+    }
+
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    public void init() throws IOException {
+      restart(scan.getStartRow());
+      byte[] startRow = scan.getStartRow();
+      byte[] endRow = scan.getStopRow();
+
+      Get get = new Get(Bytes.toBytes(Constants.METADATA));
+      get.addFamily(Constants.ATTRIBUTE);
+      byte[] result = htable.get(get).getValue(Constants.ATTRIBUTE,
+          Bytes.toBytes("rows"));
+
+      size = (result != null) ? BytesUtil.bytesToInt(result) : 0;
+
+      if (endRow.length == 0) { // the last split, we don't know the end row
+        totalRows = 0; // so we just skip it.
+        if (startRow.length == 0)
+          startRowId = 0;
+        else
+          startRowId = BytesUtil.bytesToInt(startRow);
+        endRowId = -1;
+      } else {
+        if (startRow.length == 0) { // the first split, start row is 0
+          totalRows = BytesUtil.bytesToInt(endRow);
+          startRowId = 0;
+          endRowId = totalRows;
+        } else {
+          startRowId = BytesUtil.bytesToInt(startRow);
+          endRowId = BytesUtil.bytesToInt(endRow);
+          totalRows = startRowId - endRowId;
+        }
+      }
+      processedRows = 0;
+      LOG.info("Split (" + startRowId + ", " + endRowId + ") -> " + totalRows);
+    }
+
+    public void restart(byte[] firstRow) throws IOException {
+      Scan newScan = new Scan(scan);
+      newScan.setStartRow(firstRow);
+      this.scanner = this.htable.getScanner(newScan);
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.scanner.close();
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException,
+        InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public NullWritable getCurrentValue() throws IOException,
+        InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      if (totalRows <= 0) {
+        return 0;
+      } else {
+        return Math.min(1.0f, processedRows / (float) totalRows);
+      }
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      Result vv;
+      try {
+        vv = this.scanner.next();
+      } catch (IOException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        restart(lastRow);
+        scanner.next(); // skip presumed already mapped row
+        vv = scanner.next();
+      }
+
+      double s1, s2;
+      VectorUpdate bu;
+      boolean hasMore = vv != null && vv.size() > 0;
+      if (hasMore) {
+        byte[] row = vv.getRow();
+        int rowId = BytesUtil.bytesToInt(row);
+        if (rowId < pivotrow) {
+          Get get = new Get(BytesUtil.getRowIndex(rowId));
+          s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+              Bytes.toBytes(Constants.EICOL),
+              Bytes.toBytes(String.valueOf(pivotrow))));
+          s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+              Bytes.toBytes(Constants.EICOL),
+              Bytes.toBytes(String.valueOf(pivotcol))));
+
+          bu = new VectorUpdate(rowId);
+          bu.put(Constants.EICOL, pivotrow, pivotcos * s1
+              - pivotsin * s2);
+          bu.put(Constants.EICOL, pivotcol, pivotsin * s1
+              + pivotcos * s2);
+
+          htable.put(bu.getPut());
+        } else if (rowId == pivotrow) {
+          return true;
+        } else if (rowId < pivotcol) {
+          Get get = new Get(BytesUtil.getRowIndex(pivotrow));
+          s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+              Bytes.toBytes(Constants.EICOL),
+              Bytes.toBytes(String.valueOf(rowId))));
+          get = new Get(BytesUtil.getRowIndex(rowId));
+
+          s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+              Bytes.toBytes(Constants.EICOL),
+              Bytes.toBytes(String.valueOf(pivotcol))));
+
+          bu = new VectorUpdate(rowId);
+          bu.put(Constants.EICOL, pivotcol, pivotsin * s1
+              + pivotcos * s2);
+          htable.put(bu.getPut());
+
+          bu = new VectorUpdate(pivotrow);
+          bu.put(Constants.EICOL, rowId, pivotcos * s1 - pivotsin
+              * s2);
+          htable.put(bu.getPut());
+
+        } else if (rowId == pivotcol) {
+          for (int i = pivotcol + 1; i < size; i++) {
+            Get get = new Get(BytesUtil.getRowIndex(pivotrow));
+
+            s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+                Bytes.toBytes(Constants.EICOL),
+                Bytes.toBytes(String.valueOf(i))));
+
+            get = new Get(BytesUtil.getRowIndex(pivotcol));
+            s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+                Bytes.toBytes(Constants.EICOL),
+                Bytes.toBytes(String.valueOf(i))));
+
+            bu = new VectorUpdate(pivotcol);
+            bu.put(Constants.EICOL, i, pivotsin * s1 + pivotcos
+                * s2);
+            htable.put(bu.getPut());
+
+            bu = new VectorUpdate(pivotrow);
+            bu.put(Constants.EICOL, i, pivotcos * s1 - pivotsin
+                * s2);
+            htable.put(bu.getPut());
+          }
+        } else { // rowId > pivotcol
+          return false;
+        }
+
+        lastRow = row;
+      }
+      return hasMore;
+    }
+
+  }
+
+  @Override
+  public RecordReader<NullWritable, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    TableSplit tSplit = (TableSplit) split;
+    RotationRecordReader trr = this.rotationRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, pivot_cos);
+    }
+    Scan sc = new Scan(this.scan);
+    sc.setStartRow(tSplit.getStartRow());
+    sc.setStopRow(tSplit.getEndRow());
+    trr.setScan(sc);
+    trr.setHTable(table);
+    trr.init();
+    return trr;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    if (table == null) {
+      throw new IOException("No table was provided.");
+    }
+    byte[][] startKeys = table.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region.");
+    }
+    int realNumSplits = startKeys.length;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos])
+          .getServerAddress().getHostname();
+      splits[i] = new TableSplit(this.table.getTableName(),
+          startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+              : HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return Arrays.asList(splits);
+  }
+
+  protected HTable getHTable() {
+    return this.table;
+  }
+
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  public Scan getScan() {
+    if (this.scan == null)
+      this.scan = new Scan();
+    return scan;
+  }
+
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  protected void setTableRecordReader(RotationRecordReader rotationRecordReader) {
+    this.rotationRecordReader = rotationRecordReader;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    pivot_row = conf.getInt(Constants.PIVOTROW, -1);
+    pivot_col = conf.getInt(Constants.PIVOTCOL, -1);
+    pivot_sin = Double.parseDouble(conf.get(Constants.PIVOTSIN));
+    pivot_cos = Double.parseDouble(conf.get(Constants.PIVOTCOS));
+
+    this.conf = conf;
+    String tableName = conf.get(INPUT_TABLE);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(conf), tableName));
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+    Scan scan = null;
+    try {
+      scan = PivotInputFormat.convertStringToScan(conf.get(SCAN));
+    } catch (IOException e) {
+      LOG.error("An error occurred.", e);
+    }
+    setScan(scan);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Thu Oct 29 06:13:51 2009
@@ -54,7 +54,6 @@
 import org.apache.hama.HamaAdminImpl;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
 import org.apache.hama.matrix.algebra.MatrixNormMapReduce;
 import org.apache.hama.matrix.algebra.TransposeMap;
 import org.apache.hama.matrix.algebra.TransposeReduce;
@@ -137,14 +136,14 @@
 
       // It's a temporary data.
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(Constants.BLOCK_FAMILY)));
+          .toBytes(Constants.BLOCK)));
       // the following families are used in JacobiEigenValue computation
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(JacobiEigenValue.EI_COLUMNFAMILY)));
+          .toBytes(Constants.EI)));
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(JacobiEigenValue.EICOL)));
+          .toBytes(Constants.EICOL)));
       this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(JacobiEigenValue.EIVEC)));
+          .toBytes(Constants.EIVEC)));
 
       LOG.info("Initializing the matrix storage.");
       this.admin.createTable(this.tableDesc);
@@ -460,10 +459,10 @@
     scan.addFamily(Constants.COLUMNFAMILY);
     scan.addFamily(Constants.ATTRIBUTE);
     scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
-    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+    scan.addFamily(Bytes.toBytes(Constants.EI));
+    scan.addFamily(Bytes.toBytes(Constants.EICOL));
+    scan.addFamily(Bytes.toBytes(Constants.EIVEC));
 
     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B
         .getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class,
@@ -489,10 +488,10 @@
     scan.addFamily(Constants.COLUMNFAMILY);
     scan.addFamily(Constants.ATTRIBUTE);
     scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
-    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
-    scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+    scan.addFamily(Bytes.toBytes(Constants.EI));
+    scan.addFamily(Bytes.toBytes(Constants.EICOL));
+    scan.addFamily(Bytes.toBytes(Constants.EIVEC));
     Float f = new Float(alpha);
     job.getConfiguration().setFloat("set.alpha", f);
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Thu Oct 29 06:13:51 2009
@@ -31,10 +31,12 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -46,32 +48,31 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.io.Pair;
 import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.DummyMapper;
-import org.apache.hama.mapred.VectorInputFormat;
 import org.apache.hama.mapreduce.CollectBlocksMapper;
+import org.apache.hama.mapreduce.DummyMapper;
+import org.apache.hama.mapreduce.PivotInputFormat;
 import org.apache.hama.mapreduce.RandomMatrixMapper;
 import org.apache.hama.mapreduce.RandomMatrixReducer;
+import org.apache.hama.mapreduce.RotationInputFormat;
 import org.apache.hama.matrix.algebra.BlockMultMap;
 import org.apache.hama.matrix.algebra.BlockMultReduce;
 import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
 import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
+import org.apache.hama.matrix.algebra.JacobiInitMap;
 import org.apache.hama.matrix.algebra.MatrixAdditionMap;
 import org.apache.hama.matrix.algebra.MatrixAdditionReduce;
+import org.apache.hama.matrix.algebra.PivotMap;
 import org.apache.hama.util.BytesUtil;
 import org.apache.hama.util.RandomVariable;
 
@@ -600,7 +601,7 @@
     String collectionTable = "collect_" + RandomVariable.randMatrixPath();
     HTableDescriptor desc = new HTableDescriptor(collectionTable);
     desc
-        .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK_FAMILY)));
+        .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
     this.admin.createTable(desc);
     LOG.info("Collect Blocks");
 
@@ -613,7 +614,7 @@
     Job job = new Job(config, "multiplication MR job : " + result.getPath());
 
     Scan scan = new Scan();
-    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK));
 
     TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
         BlockMultMap.class, BlockID.class, BytesWritable.class, job);
@@ -773,30 +774,30 @@
    * @throws IOException
    */
   public void jacobiEigenValue(int loops) throws IOException {
-    JobConf jobConf = new JobConf(config);
-
-    /***************************************************************************
-     * Initialization
-     * 
-     * A M/R job is used for initialization(such as, preparing a matrx copy of
-     * the original in "eicol:" family.)
-     **************************************************************************/
+    /*
+     * Initialization A M/R job is used for initialization(such as, preparing a
+     * matrx copy of the original in "eicol:" family.)
+     */
     // initialization
-    jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+    Job job = new Job(config, "JacobiEigen initialization MR job" + getPath());
 
-    jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
-    jobConf.setInputFormat(VectorInputFormat.class);
-    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
 
-    org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, getPath());
-    jobConf.set(JacobiEigenValue.MATRIX, getPath());
-    jobConf.setOutputFormat(NullOutputFormat.class);
-    jobConf.setMapOutputKeyClass(IntWritable.class);
-    jobConf.setMapOutputValueClass(MapWritable.class);
+    TableMapReduceUtil.initTableMapperJob(getPath(), scan, JacobiInitMap.class,
+        ImmutableBytesWritable.class, Put.class, job);
+    TableMapReduceUtil.initTableReducerJob(getPath(),
+        IdentityTableReducer.class, job);
 
-    JobClient.runJob(jobConf);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
 
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(config);
     Pair pivotPair = new Pair();
     DoubleWritable pivotWritable = new DoubleWritable();
     VectorUpdate vu;
@@ -809,45 +810,46 @@
     double s, c, t, y;
 
     while (state != 0 && loops > 0) {
-      /*************************************************************************
-       * Find the pivot and its index(pivot_row, pivot_col)
-       * 
-       * A M/R job is used to scan all the "eival:ind" to get the max absolute
-       * value of each row, and do a MAX aggregation of these max values to get
-       * the max value in the matrix.
-       ************************************************************************/
-      jobConf = new JobConf(config);
-      jobConf.setJobName("Find Pivot MR job" + getPath());
-
-      jobConf.setNumReduceTasks(1);
-
+      /*
+       * Find the pivot and its index(pivot_row, pivot_col) A M/R job is used to
+       * scan all the "eival:ind" to get the max absolute value of each row, and
+       * do a MAX aggregation of these max values to get the max value in the
+       * matrix.
+       */
       Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_"
           + System.currentTimeMillis()), "out");
       if (fs.exists(outDir))
         fs.delete(outDir, true);
 
-      jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
-      jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
-      jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST,
-          JacobiEigenValue.EIIND);
-      org.apache.hadoop.mapred.FileInputFormat
-          .addInputPaths(jobConf, getPath());
-      jobConf.setMapOutputKeyClass(Pair.class);
-      jobConf.setMapOutputValueClass(DoubleWritable.class);
-
-      jobConf.setOutputKeyClass(Pair.class);
-      jobConf.setOutputValueClass(DoubleWritable.class);
-      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-      FileOutputFormat.setOutputPath(jobConf, outDir);
+      job = new Job(config, "Find Pivot MR job" + getPath());
 
-      // update the out put dir of the job
-      outDir = FileOutputFormat.getOutputPath(jobConf);
+      scan = new Scan();
+      scan.addFamily(Bytes.toBytes(Constants.EI));
 
-      JobClient.runJob(jobConf);
+      job.setInputFormatClass(PivotInputFormat.class);
+      job.setMapOutputKeyClass(Pair.class);
+      job.setMapOutputValueClass(DoubleWritable.class);
+      job.setMapperClass(PivotMap.class);
+      job.getConfiguration().set(PivotInputFormat.INPUT_TABLE, getPath());
+      job.getConfiguration().set(PivotInputFormat.SCAN,
+          PivotInputFormat.convertScanToString(scan));
+
+      job.setOutputKeyClass(Pair.class);
+      job.setOutputValueClass(DoubleWritable.class);
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
 
       // read outputs
-      Path inFile = new Path(outDir, "part-00000");
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+      Path inFile = new Path(outDir, "part-r-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config);
       try {
         reader.next(pivotPair, pivotWritable);
         pivot_row = pivotPair.getRow();
@@ -864,18 +866,18 @@
        * Compute the rotation parameters of next rotation.
        ************************************************************************/
       Get get = new Get(BytesUtil.getRowIndex(pivot_row));
-      get.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+      get.addFamily(Bytes.toBytes(Constants.EI));
       Result r = table.get(get);
       double e1 = BytesUtil.bytesToDouble(r.getValue(Bytes
-          .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
-          .toBytes(JacobiEigenValue.EI_VAL)));
+          .toBytes(Constants.EI), Bytes
+          .toBytes(Constants.EIVAL)));
 
       get = new Get(BytesUtil.getRowIndex(pivot_col));
-      get.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+      get.addFamily(Bytes.toBytes(Constants.EI));
       r = table.get(get);
       double e2 = BytesUtil.bytesToDouble(r.getValue(Bytes
-          .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
-          .toBytes(JacobiEigenValue.EI_VAL)));
+          .toBytes(Constants.EI), Bytes
+          .toBytes(Constants.EIVAL)));
 
       y = (e2 - e1) / 2;
       t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
@@ -888,61 +890,65 @@
         t = -t;
       }
 
-      /*************************************************************************
+      /*
        * Upate the pivot and the eigen values indexed by the pivot
-       ************************************************************************/
+       */
       vu = new VectorUpdate(pivot_row);
-      vu.put(JacobiEigenValue.EICOL_FAMILY, pivot_col, 0);
+      vu.put(Constants.EICOL, pivot_col, 0);
       table.put(vu.getPut());
 
       state = update(pivot_row, -t, state);
       state = update(pivot_col, t, state);
 
-      /*************************************************************************
+      /*
        * Rotation the matrix
-       ************************************************************************/
-      // rotation
-      jobConf = new JobConf(config);
-      jobConf.setJobName("Rotation Matrix MR job" + getPath());
-
-      jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
-      jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
-      jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
-      jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
-
-      jobConf.setMapperClass(DummyMapper.class);
-      jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);
-      jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST,
-          JacobiEigenValue.EIIND);
-      org.apache.hadoop.mapred.FileInputFormat
-          .addInputPaths(jobConf, getPath());
-      jobConf.setMapOutputKeyClass(NullWritable.class);
-      jobConf.setMapOutputValueClass(NullWritable.class);
-      org.apache.hadoop.mapred.FileInputFormat
-          .addInputPaths(jobConf, getPath());
-      jobConf.setOutputFormat(NullOutputFormat.class);
+       */
+      job = new Job(config, "Rotation Matrix MR job" + getPath());
 
-      JobClient.runJob(jobConf);
+      scan = new Scan();
+      scan.addFamily(Bytes.toBytes(Constants.EI));
+
+      job.getConfiguration().setInt(Constants.PIVOTROW, pivot_row);
+      job.getConfiguration().setInt(Constants.PIVOTCOL, pivot_col);
+      job.getConfiguration().set(Constants.PIVOTSIN, String.valueOf(s));
+      job.getConfiguration().set(Constants.PIVOTCOS, String.valueOf(c));
+
+      job.setInputFormatClass(RotationInputFormat.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(NullWritable.class);
+      job.setMapperClass(DummyMapper.class);
+      job.getConfiguration().set(RotationInputFormat.INPUT_TABLE, getPath());
+      job.getConfiguration().set(RotationInputFormat.SCAN,
+          PivotInputFormat.convertScanToString(scan));
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
 
       // rotate eigenvectors
       LOG.info("rotating eigenvector");
       for (int i = 0; i < size; i++) {
         get = new Get(BytesUtil.getRowIndex(pivot_row));
         e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+            Bytes.toBytes(Constants.EIVEC),
             Bytes.toBytes(String.valueOf(i))));
 
         get = new Get(BytesUtil.getRowIndex(pivot_col));
         e2 = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+            Bytes.toBytes(Constants.EIVEC),
             Bytes.toBytes(String.valueOf(i))));
 
         vu = new VectorUpdate(pivot_row);
-        vu.put(JacobiEigenValue.EIVEC_FAMILY, i, c * e1 - s * e2);
+        vu.put(Constants.EIVEC, i, c * e1 - s * e2);
         table.put(vu.getPut());
 
         vu = new VectorUpdate(pivot_col);
-        vu.put(JacobiEigenValue.EIVEC_FAMILY, i, s * e1 + c * e2);
+        vu.put(Constants.EIVEC, i, s * e1 + c * e2);
         table.put(vu.getPut());
       }
 
@@ -962,13 +968,13 @@
       get = new Get(BytesUtil.getRowIndex(row));
 
       double max = BytesUtil.bytesToDouble(table.get(get).getValue(
-          Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+          Bytes.toBytes(Constants.EICOL),
           Bytes.toBytes(String.valueOf(m))));
       double val;
       for (int i = row + 2; i < size; i++) {
         get = new Get(BytesUtil.getRowIndex(row));
         val = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+            Bytes.toBytes(Constants.EICOL),
             Bytes.toBytes(String.valueOf(i))));
         if (Math.abs(val) > Math.abs(max)) {
           m = i;
@@ -978,34 +984,34 @@
     }
 
     VectorUpdate vu = new VectorUpdate(row);
-    vu.put(JacobiEigenValue.EI_COLUMNFAMILY, "ind", String.valueOf(m));
+    vu.put(Constants.EI, "ind", String.valueOf(m));
     table.put(vu.getPut());
   }
 
   int update(int row, double value, int state) throws IOException {
     Get get = new Get(BytesUtil.getRowIndex(row));
     double e = BytesUtil.bytesToDouble(table.get(get).getValue(
-        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
-        Bytes.toBytes(JacobiEigenValue.EI_VAL)));
+        Bytes.toBytes(Constants.EI),
+        Bytes.toBytes(Constants.EIVAL)));
     int changed = BytesUtil.bytesToInt(table.get(get).getValue(
-        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
+        Bytes.toBytes(Constants.EI),
         Bytes.toBytes("changed")));
     double y = e;
     e += value;
 
     VectorUpdate vu = new VectorUpdate(row);
-    vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EI_VAL, e);
+    vu.put(Constants.EI, Constants.EIVAL, e);
 
     if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
       changed = 0;
-      vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
-          JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+      vu.put(Constants.EI,
+          Constants.EICHANGED, String.valueOf(changed));
 
       state--;
     } else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
       changed = 1;
-      vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
-          JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+      vu.put(Constants.EI,
+          Constants.EICHANGED, String.valueOf(changed));
 
       state++;
     }
@@ -1021,8 +1027,8 @@
     for (int i = 0; i < e.length; i++) {
       get = new Get(BytesUtil.getRowIndex(i));
       e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
-          Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
-          Bytes.toBytes(JacobiEigenValue.EI_VAL)));
+          Bytes.toBytes(Constants.EI),
+          Bytes.toBytes(Constants.EIVAL)));
       success &= ((Math.abs(e1 - e[i]) < .0000001));
       if (!success)
         return success;
@@ -1030,7 +1036,7 @@
       for (int j = 0; j < E[i].length; j++) {
         get = new Get(BytesUtil.getRowIndex(i));
         ev = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+            Bytes.toBytes(Constants.EIVEC),
             Bytes.toBytes(String.valueOf(j))));
         success &= ((Math.abs(ev - E[i][j]) < .0000001));
         if (!success)

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java Thu Oct 29 06:13:51 2009
@@ -12,7 +12,7 @@
 import org.apache.hama.matrix.SubMatrix;
 
 public class BlockMultMap extends TableMapper<BlockID, BytesWritable> {
-  private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK_FAMILY);
+  private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK);
   
   public void map(ImmutableBytesWritable key, Result value, Context context) 
   throws IOException, InterruptedException {

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,75 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * The matrix will be modified during computing eigen value. So a new matrix
+ * will be created to prevent the original matrix being modified. To reduce the
+ * network transfer, we copy the "column" family in the original matrix to a
+ * "eicol" family. All the following modification will be done over "eicol"
+ * family.
+ * 
+ * And the output Eigen Vector Arrays "eivec", and the output eigen value array
+ * "eival:value", and the temp status array "eival:changed", "eival:ind" will be
+ * created.
+ * 
+ * Also "eival:state" will record the state of the rotation state of a matrix
+ */
+public class JacobiInitMap extends TableMapper<ImmutableBytesWritable, Put> {
+
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+      throws IOException, InterruptedException {
+    int row, col;
+    row = BytesUtil.getRowIndex(key.get());
+    VectorUpdate vu = new VectorUpdate(row);
+
+    double val;
+    double maxVal = Double.MIN_VALUE;
+    int maxInd = row + 1;
+
+    boolean init = true;
+
+    NavigableMap<byte[], byte[]> map = value
+        .getFamilyMap(Constants.COLUMNFAMILY);
+    for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
+      val = BytesUtil.bytesToDouble(e.getValue());
+      col = BytesUtil.bytesToInt(e.getKey());
+      // copy the original matrix to "EICOL" family
+      vu.put(Constants.EICOL, col, val);
+      // make the "EIVEC" a dialog matrix
+      vu.put(Constants.EIVEC, col, col == row ? 1 : 0);
+      if (col == row) {
+        vu.put(Constants.EI, Constants.EIVAL, val);
+      }
+      // find the max index
+      if (col > row) {
+        if (init) {
+          maxInd = col;
+          maxVal = val;
+          init = false;
+        } else {
+          if (Math.abs(val) > Math.abs(maxVal)) {
+            maxVal = val;
+            maxInd = col;
+          }
+        }
+      }
+    }
+
+    // index array
+    vu.put(Constants.EI, Constants.EIIND, String.valueOf(maxInd));
+    // Changed Array set to be true during initialization
+    vu.put(Constants.EI, Constants.EICHANGED, String.valueOf(1));
+    context.write(key, vu.getPut());
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java Thu Oct 29 06:13:51 2009
@@ -14,6 +14,7 @@
 import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
+/** A Catalog class collect all the mr classes to compute the matrix's norm */
 public class MatrixNormMapReduce {
   public final static IntWritable nKey = new IntWritable(-1);
 

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,29 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hama.io.Pair;
+
+public class PivotMap extends
+    Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+  private double max = 0;
+  private Pair pair = new Pair(0, 0);
+  private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+  private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+  public void map(Pair key, DoubleWritable value, Context context)
+      throws IOException, InterruptedException {
+    if (key.getRow() != Integer.MAX_VALUE) {
+      if (Math.abs(value.get()) > Math.abs(max)) {
+        pair.set(key.getRow(), key.getColumn());
+        max = value.get();
+      }
+    } else {
+      context.write(pair, new DoubleWritable(max));
+      context.write(dummyPair, dummyVal);
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java Thu Oct 29 06:13:51 2009
@@ -130,16 +130,6 @@
         .substring(cKey.indexOf(":") + 1, cKey.length()));
   }
 
-  /**
-   * Gets the column index
-   * 
-   * @param integer
-   * @return the converted value
-   */
-  public static byte[] getColumnIndex(int integer) {
-    return Bytes.toBytes(Constants.COLUMN + String.valueOf(integer));
-  }
-
   public static int getBlockIndex(byte[] key) {
     String cKey = new String(key);
     return Integer.parseInt(cKey

Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java Thu Oct 29 06:13:51 2009
@@ -26,9 +26,9 @@
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
 import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
@@ -80,12 +80,12 @@
 
     for (int x = 0; x < 2; x++) {
       Get get = new Get(BytesUtil.getRowIndex(x));
-      get.addColumn(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+      get.addColumn(Bytes.toBytes(Constants.EI));
       Result r = table.get(get);
 
       double eigenvalue = BytesUtil.bytesToDouble(r.getValue(Bytes
-          .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
-          .toBytes(JacobiEigenValue.EI_VAL)));
+          .toBytes(Constants.EI), Bytes
+          .toBytes(Constants.EIVAL)));
       assertTrue(Math.abs(eigenvalues[x] - eigenvalue) < .0000001);
       assertTrue(Math.abs(Math.pow(eigenvalue, 0.5) - singularvalues[x]) < .0000001);
     }

Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java Thu Oct 29 06:13:51 2009
@@ -27,6 +27,8 @@
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.util.BytesUtil;
 
@@ -67,7 +69,7 @@
 
     HTable table = m1.getHTable();
     Get get = new Get(BytesUtil.getRowIndex(0));
-    get.addColumn(BytesUtil.getColumnIndex(1));
+    get.addColumn(Constants.COLUMNFAMILY, Bytes.toBytes(1));
     Result r = table.get(get);
     assertTrue(r.getCellValue() == null);
     

Modified: incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java Thu Oct 29 06:13:51 2009
@@ -42,13 +42,4 @@
     assertEquals(BytesUtil.bytesToDouble(BytesUtil.doubleToBytes(TEST_DOUBLE)),
         TEST_DOUBLE);
   }
-
-  /**
-   * Get the column index from hbase.
-   */
-  public void testGetColumnIndex() {
-    byte[] result = BytesUtil.getColumnIndex(3);
-    assertEquals(Bytes.toString(result), Constants.COLUMN
-        + BytesUtil.getColumnIndex(result));
-  }
 }