You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/11/17 02:57:47 UTC

svn commit: r718158 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/java/org/apache/hama/util/ src/test/...

Author: edwardyoon
Date: Sun Nov 16 17:57:47 2008
New Revision: 718158

URL: http://svn.apache.org/viewvc?rev=718158&view=rev
Log:
2D block multiplication

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
    incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
    incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Nov 16 17:57:47 2008
@@ -4,9 +4,10 @@
 
   NEW FEATURES
   
+    HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon)
     HAMA-104: Add getNumMap/reduceTasks to HamaConfiguration (edwardyoon)
     HAMA-92: Add subMatrix() to Matrix (edwardyoon)
-    HAMA-83: Add a writable comparable for BlockID (edwardyoon) 
+    HAMA-84: Add a writable comparable for BlockID (edwardyoon) 
     HAMA-81: Add subVector(int i0, int i1) to Vector (edwardyoon)
     Hama-80: Add identity(int m, int n) 
                which returns identity matrix (edwardyoon) 

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Sun Nov 16 17:57:47 2008
@@ -40,9 +40,12 @@
       parseArgs(args);
     }
 
-    Matrix a = DenseMatrix.random(conf, row, column);
-    Matrix b = DenseMatrix.random(conf, row, column);
+    DenseMatrix a = DenseMatrix.random(conf, row, column);
+    DenseMatrix b = DenseMatrix.random(conf, row, column);
 
+    a.blocking(conf.getNumMapTasks());
+    b.blocking(conf.getNumMapTasks());
+    
     Matrix c = a.mult(b);
 
     a.close();

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Sun Nov 16 17:57:47 2008
@@ -73,7 +73,8 @@
       this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
       this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
       this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
-
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK));
+      
       LOG.info("Initializing the matrix storage.");
       this.admin.createTable(this.tableDesc);
       LOG.info("Create Matrix " + matrixPath);

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java Sun Nov 16 17:57:47 2008
@@ -22,10 +22,10 @@
 import java.util.Iterator;
 
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
 
 public abstract class AbstractVector {
-  public VectorMapWritable<Integer, DoubleEntry> entries;
+  public MapWritable<Integer, DoubleEntry> entries;
   
   public double get(int index) throws NullPointerException {
     return this.entries.get(index).getValue();
@@ -34,7 +34,7 @@
   public void set(int index, double value) {
     // If entries are null, create new object 
     if(this.entries == null) {
-      this.entries = new VectorMapWritable<Integer, DoubleEntry>();
+      this.entries = new MapWritable<Integer, DoubleEntry>();
     }
     
     this.entries.put(index, new DoubleEntry(value));
@@ -62,7 +62,7 @@
     return (this.entries != null) ? this.entries.size() : 0;
   }
   
-  public VectorMapWritable<Integer, DoubleEntry> getEntries() {
+  public MapWritable<Integer, DoubleEntry> getEntries() {
     return this.entries;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Nov 16 17:57:47 2008
@@ -86,4 +86,22 @@
   
   /** default try times to generate a suitable tablename */
   public static final int DEFAULT_TRY_TIMES = 10000000;
+  
+  /** start row of block */ 
+  public static final String BLOCK_STARTROW = "attribute:startRow";
+  
+  /** end row of block */
+  public static final String BLOCK_ENDROW = "attribute:endRow";
+  
+  /** start column of block */
+  public static final String BLOCK_STARTCOLUMN = "attribute:startColumn";
+  
+  /** end column of block */
+  public static final String BLOCK_ENDCOLUMN = "attribute:endColumn";
+
+  /** block dimension */
+  public static final String BLOCK = "block:";
+
+  /** block size */
+  public static final String BLOCK_SIZE = "attribute:blockSize";
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Sun Nov 16 17:57:47 2008
@@ -20,49 +20,52 @@
 package org.apache.hama;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
 import org.apache.hama.algebra.RowCyclicAdditionMap;
 import org.apache.hama.algebra.RowCyclicAdditionReduce;
 import org.apache.hama.algebra.SIMDMultiplyMap;
 import org.apache.hama.algebra.SIMDMultiplyReduce;
+import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.BlockCyclicReduce;
 import org.apache.hama.mapred.RowCyclicReduce;
-import org.apache.hama.util.JobManager;
 import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
 import org.apache.hama.util.RandomVariable;
 
 public class DenseMatrix extends AbstractMatrix implements Matrix {
 
   static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
   static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_";
-  
+
   /**
-   * Construct a raw matrix.
-   * Just create a table in HBase, but didn't lay any schema ( such as
-   * dimensions: i, j ) on it.
+   * Construct a raw matrix. Just create a table in HBase, but didn't lay any
+   * schema ( such as dimensions: i, j ) on it.
    * 
    * @param conf configuration object
-   * @throws IOException 
-   *         throw the exception to let the user know what happend, if we
-   *         didn't create the matrix successfully.
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
    */
   public DenseMatrix(HamaConfiguration conf) throws IOException {
     setConfiguration(conf);
-    
+
     tryToCreateTable();
-    
+
     closed = false;
   }
 
@@ -71,28 +74,27 @@
    * 
    * @param conf configuration object
    * @param matrixName the name of the matrix
-   * @param force if force is true, a new matrix will be created 
-   *              no matter 'matrixName' has aliased to an existed matrix;
-   *              otherwise, just try to load an existed matrix alised 
-   *              'matrixName'. 
-   * @throws IOException 
+   * @param force if force is true, a new matrix will be created no matter
+   *                'matrixName' has aliased to an existed matrix; otherwise,
+   *                just try to load an existed matrix alised 'matrixName'.
+   * @throws IOException
    */
-  public DenseMatrix(HamaConfiguration conf, String matrixName, 
-      boolean force) throws IOException {
+  public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
+      throws IOException {
     setConfiguration(conf);
     // if force is set to true:
-    // 1) if this matrixName has aliase to other matrix, we will remove 
-    //    the old aliase, create a new matrix table, and aliase to it.
+    // 1) if this matrixName has aliase to other matrix, we will remove
+    // the old aliase, create a new matrix table, and aliase to it.
     // 2) if this matrixName has no aliase to other matrix, we will create
-    //    a new matrix table, and alise to it.
+    // a new matrix table, and alise to it.
     //
     // if force is set to false, we just try to load an existed matrix alised
     // as 'matrixname'.
-    
+
     boolean existed = hamaAdmin.matrixExists(matrixName);
-    
+
     if (force) {
-      if(existed) {
+      if (existed) {
         // remove the old aliase
         hamaAdmin.delete(matrixName);
       }
@@ -101,7 +103,7 @@
       // save the new aliase relationship
       save(matrixName);
     } else {
-      if(existed) {
+      if (existed) {
         // try to get the actual path of the table
         matrixPath = hamaAdmin.getPath(matrixName);
         // load the matrix
@@ -109,33 +111,34 @@
         // increment the reference
         incrementAndGetRef();
       } else {
-        throw new IOException("Try to load non-existed matrix alised as " + matrixName);
+        throw new IOException("Try to load non-existed matrix alised as "
+            + matrixName);
       }
     }
 
     closed = false;
   }
-  
+
   /**
-   * Load a matrix from an existed matrix table whose tablename is 'matrixpath'
-   * 
-   * !! It is an internal used for map/reduce.
+   * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+   * It is an internal used for map/reduce.
    * 
    * @param conf configuration object
-   * @param matrixpath 
-   * @throws IOException 
-   * @throws IOException 
+   * @param matrixpath
+   * @throws IOException
+   * @throws IOException
    */
-  public DenseMatrix(HamaConfiguration conf, String matrixpath) throws IOException {
+  public DenseMatrix(HamaConfiguration conf, String matrixpath)
+      throws IOException {
     setConfiguration(conf);
     matrixPath = matrixpath;
     // load the matrix
     table = new HTable(conf, matrixPath);
     // TODO: now we don't increment the reference of the table
-    //       for it's an internal use for map/reduce.
-    //       if we want to increment the reference of the table,
-    //       we don't know where to call Matrix.close in Add & Mul map/reduce
-    //       process to decrement the reference. It seems difficulty.
+    // for it's an internal use for map/reduce.
+    // if we want to increment the reference of the table,
+    // we don't know where to call Matrix.close in Add & Mul map/reduce
+    // process to decrement the reference. It seems difficulty.
   }
 
   /**
@@ -145,17 +148,17 @@
    * @param m the number of rows.
    * @param n the number of columns.
    * @param s fill the matrix with this scalar value.
-   * @throws IOException 
-   *         throw the exception to let the user know what happend, if we
-   *         didn't create the matrix successfully.
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
    */
-  public DenseMatrix(HamaConfiguration conf, int m, int n, double s) throws IOException {
+  public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
+      throws IOException {
     setConfiguration(conf);
-    
+
     tryToCreateTable();
 
     closed = false;
-    
+
     for (int i = 0; i < m; i++) {
       for (int j = 0; j < n; j++) {
         set(i, j, s);
@@ -164,29 +167,31 @@
 
     setDimension(m, n);
   }
-  
-  /** try to create a new matrix with a new random name.
-   *  try times will be (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+
+  /**
+   * try to create a new matrix with a new random name. try times will be
+   * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+   * 
    * @throws IOException
    */
   private void tryToCreateTable() throws IOException {
     int tryTimes = Constants.DEFAULT_TRY_TIMES;
     do {
       matrixPath = TABLE_PREFIX + RandomVariable.randMatrixPath(tryPathLength);
-      
+
       if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
         tableDesc = new HTableDescriptor(matrixPath);
         create();
         return;
       }
-      
+
       tryTimes--;
-      if(tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
+      if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
         tryPathLength++;
         tryTimes = Constants.DEFAULT_TRY_TIMES;
       }
-      
-    } while(tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
+
+    } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
     // exhaustes the try times.
     // throw out an IOException to let the user know what happened.
     throw new IOException("Try too many times to create a table in hbase.");
@@ -201,11 +206,12 @@
    * @return an m-by-n matrix with uniformly distributed random elements.
    * @throws IOException
    */
-  public static Matrix random(HamaConfiguration conf, int m, int n)
+  public static DenseMatrix random(HamaConfiguration conf, int m, int n)
       throws IOException {
-    Matrix rand = new DenseMatrix(conf);
+    DenseMatrix rand = new DenseMatrix(conf);
     DenseVector vector = new DenseVector();
-    LOG.info("Create the " + m + " * " + n + " random matrix : " + rand.getPath());
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
 
     for (int i = 0; i < m; i++) {
       vector.clear();
@@ -231,8 +237,9 @@
   public static Matrix identity(HamaConfiguration conf, int m, int n)
       throws IOException {
     Matrix identity = new DenseMatrix(conf);
-    LOG.info("Create the " + m + " * " + n + " identity matrix : " + identity.getPath());
-    
+    LOG.info("Create the " + m + " * " + n + " identity matrix : "
+        + identity.getPath());
+
     for (int i = 0; i < m; i++) {
       DenseVector vector = new DenseVector();
       for (int j = 0; j < n; j++) {
@@ -254,9 +261,11 @@
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), RowCyclicAdditionMap.class,
-        IntWritable.class, VectorWritable.class, jobConf);
-    RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class, jobConf);
+    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(),
+        RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class,
+        jobConf);
+    RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class,
+        jobConf);
 
     JobManager.execute(jobConf, result);
     return result;
@@ -276,11 +285,11 @@
     byte[][] c = { columnKey };
     Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
 
-    VectorMapWritable<Integer, DoubleEntry> trunk = new VectorMapWritable<Integer, DoubleEntry>();
+    MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
 
     for (RowResult row : scan) {
-      trunk.put(BytesUtil.bytesToInt(row.getRow()), 
-          new DoubleEntry(row.get(columnKey)));
+      trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row
+          .get(columnKey)));
     }
 
     return new DenseVector(trunk);
@@ -295,9 +304,20 @@
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class,
-        IntWritable.class, VectorWritable.class, jobConf);
-    RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf);
+    if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
+      BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
+          BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
+          jobConf);
+      BlockCyclicReduce.initJob(result.getPath(),
+          BlockCyclicMultiplyReduce.class, jobConf);
+    } else {
+      SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
+          SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
+          jobConf);
+      RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
+          jobConf);
+    }
+
     JobManager.execute(jobConf, result);
     return result;
   }
@@ -336,26 +356,123 @@
     return this.getClass().getSimpleName();
   }
 
+  // =========================================
+
   public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
     int columnSize = (j1 - j0) + 1;
-    SubMatrix result = new SubMatrix((i1-i0) + 1, columnSize);
-    byte[][] c = new byte[columnSize][];
-    for (int i = 0; i < columnSize; i++) {
-      c[i] = BytesUtil.getColumnIndex(j0 + i);
+    SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
+
+    for (int i = i0, ii = 0; i <= i1; i++, ii++) {
+      for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+        Cell c = table
+            .get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
+        result.set(ii, jj, BytesUtil.bytesToDouble(c.getValue()));
+      }
     }
 
-    Scanner scan = table.getScanner(c, BytesUtil.intToBytes(i0), BytesUtil.intToBytes(i1 + 1));
+    return result;
+  }
+
+  /**
+   * The type of the Matrix to be blocked, must be dense.
+   * 
+   * TODO: it should be work on map/reduce
+   */
+  public void blocking(int blockNum) throws IOException {
+    setBlockPosition(blockNum);
+    setBlockSize(blockNum);
+
+    String[] columns = new String[] { Constants.BLOCK_STARTROW,
+        Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
+        Constants.BLOCK_ENDCOLUMN };
+    Scanner scan = table.getScanner(columns);
 
-    int rKey = 0, cKey = 0;
     for (RowResult row : scan) {
-      for (Map.Entry<byte[], Cell> e : row.entrySet()) {
-        result.set(rKey, cKey, BytesUtil.bytesToDouble(e.getValue().getValue()));
-        cKey++;
+      String[] key = Bytes.toString(row.getRow()).split("[,]");
+      int blockR = Integer.parseInt(key[0]);
+      int blockC = Integer.parseInt(key[1]);
+      setBlock(blockR, blockC, blockMatrix(blockR, blockC));
+    }
+  }
+
+  public boolean isBlocked() throws IOException {
+    return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
+        : true;
+  }
+
+  public SubMatrix getBlock(int i, int j) throws IOException {
+    return BytesUtil.bytesToSubMatrix(table.get(String.valueOf(i),
+        Constants.BLOCK + j).getValue());
+  }
+
+  /**
+   * @return the size of block
+   * @throws IOException
+   */
+  public int getBlockSize() throws IOException {
+    return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
+        Constants.BLOCK_SIZE).getValue()) : -1;
+  }
+
+  protected void setBlockSize(int blockNum) throws IOException {
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
+    table.commit(update);
+  }
+
+  protected void setBlock(int i, int j, SubMatrix matrix) throws IOException {
+    BatchUpdate update = new BatchUpdate(String.valueOf(i));
+    update.put(Constants.BLOCK + j, BytesUtil.subMatrixToBytes(matrix));
+    table.commit(update);
+  }
+
+  protected void setBlockPosition(int blockNum) throws IOException {
+    int block_row_size = this.getRows() / blockNum;
+    int block_column_size = this.getColumns() / blockNum;
+
+    for (int i = 0; i < blockNum; i++) {
+      for (int j = 0; j < blockNum; j++) {
+        int startRow = i * block_row_size;
+        int endRow = (startRow + block_row_size) - 1;
+        int startColumn = j * block_column_size;
+        int endColumn = (startColumn + block_column_size) - 1;
+
+        BatchUpdate update = new BatchUpdate(getBlockKey(i, j));
+        update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
+        update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
+        update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
+            .intToBytes(startColumn));
+        update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
+        table.commit(update);
       }
-      rKey++;
-      cKey = 0;
     }
+  }
 
+  protected int[] getBlockPosition(int i, int j) throws IOException {
+    int[] result = new int[4];
+    result[0] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+        Constants.BLOCK_STARTROW).getValue());
+    result[1] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+        Constants.BLOCK_ENDROW).getValue());
+    result[2] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+        Constants.BLOCK_STARTCOLUMN).getValue());
+    result[3] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+        Constants.BLOCK_ENDCOLUMN).getValue());
     return result;
   }
+
+  protected String getBlockKey(int i, int j) {
+    return i + "," + j;
+  }
+
+  /**
+   * @param i
+   * @param j
+   * @return the sub matrix
+   * @throws IOException
+   */
+  protected SubMatrix blockMatrix(int i, int j) throws IOException {
+    int[] pos = getBlockPosition(i, j);
+    return subMatrix(pos[0], pos[1], pos[2], pos[3]);
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java Sun Nov 16 17:57:47 2008
@@ -26,7 +26,7 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
 import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
@@ -34,15 +34,15 @@
   static final Logger LOG = Logger.getLogger(DenseVector.class);
 
   public DenseVector() {
-    this(new VectorMapWritable<Integer, DoubleEntry>());
+    this(new MapWritable<Integer, DoubleEntry>());
   }
 
-  public DenseVector(VectorMapWritable<Integer, DoubleEntry> m) {
+  public DenseVector(MapWritable<Integer, DoubleEntry> m) {
     this.entries = m;
   }
 
   public DenseVector(RowResult row) {
-    this.entries = new VectorMapWritable<Integer, DoubleEntry>();
+    this.entries = new MapWritable<Integer, DoubleEntry>();
     for (Map.Entry<byte[], Cell> f : row.entrySet()) {
       this.entries.put(BytesUtil.getColumnIndex(f.getKey()), 
           new DoubleEntry(f.getValue()));

Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Sun Nov 16 17:57:47 2008
@@ -25,7 +25,8 @@
  * A sub matrix is a matrix formed by selecting certain rows and columns from a
  * bigger matrix. This is a in-memory operation only.
  */
-public class SubMatrix {
+public class SubMatrix implements java.io.Serializable {
+  private static final long serialVersionUID = 1L;
   static final Logger LOG = Logger.getLogger(SubMatrix.class);
   private double[][] matrix;
 
@@ -72,6 +73,23 @@
   }
 
   /**
+   * c = a+b
+   * 
+   * @param b
+   * @return c
+   */
+  public SubMatrix add(SubMatrix b) {
+    double[][] C = new double[size()][size()];
+    for (int i = 0; i < size(); i++) {
+      for (int j = 0; j < size(); j++) {
+        C[i][j] += this.get(i, j) + b.get(i, j);
+      }
+    }
+
+    return new SubMatrix(C);
+  }
+
+  /**
    * c = a*b
    * 
    * @param b
@@ -90,6 +108,11 @@
     return new SubMatrix(C);
   }
 
+  /**
+   * TODO: SubMatrix should be able to get row, column size
+   * 
+   * @return the length
+   */
   public int size() {
     return matrix.length;
   }
@@ -97,4 +120,9 @@
   public void close() {
     matrix = null;
   }
+
+  public double[][] getDoubles() {
+    double[][] result = matrix;
+    return result;
+  }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,57 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.Matrix;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockCyclicMap;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyMap extends
+    BlockCyclicMap<IntWritable, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
+  protected Matrix matrix_b;
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+
+  public void configure(JobConf job) {
+    try {
+      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+    } catch (IOException e) {
+      LOG.warn("Load matrix_b failed : " + e.getMessage());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void initJob(String matrix_a, String matrix_b,
+      Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
+      Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.set(MATRIX_B, matrix_b);
+
+    initJob(matrix_a, map, jobConf);
+  }
+
+  @Override
+  public void map(IntWritable key, BlockWritable value,
+      OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
+      throws IOException {
+    for (int i = 0; i < value.size(); i++) {
+      SubMatrix a = value.get(i);
+      for (int j = 0; j < ((DenseMatrix) matrix_b).getBlockSize(); j++) {
+        SubMatrix b = ((DenseMatrix) matrix_b).getBlock(i, j);
+        SubMatrix c = a.mult(b);
+        output.collect(key, new BlockWritable(key.get(), j, c));
+      }
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,59 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockEntry;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.BlockCyclicReduce;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyReduce extends
+    BlockCyclicReduce<IntWritable, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
+
+  @Override
+  public void reduce(IntWritable key, Iterator<BlockWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+    int row = key.get();
+    Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
+
+    while (values.hasNext()) {
+      BlockWritable b = values.next();
+      for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
+        int j = e.getKey();
+        SubMatrix value = e.getValue().getValue();
+        if (sum.containsKey(j)) {
+          sum.put(j, sum.get(j).add(value));
+        } else {
+          sum.put(j, value);
+        }
+      }
+    }
+
+    for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
+      int column = e.getKey();
+      SubMatrix mat = e.getValue();
+
+      int startRow = row * mat.size();
+      int startColumn = column * mat.size();
+
+      // TODO: sub matrix can be not a regular sqaure
+      for (int i = 0; i < mat.size(); i++) {
+        VectorUpdate update = new VectorUpdate(i + startRow);
+        for (int j = 0; j < mat.size(); j++) {
+          update.put(j + startColumn, mat.get(i, j));
+        }
+        output.collect(key, update);
+      }
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java Sun Nov 16 17:57:47 2008
@@ -60,11 +60,10 @@
   }
 
   /**
-   * @return the current VectorEntry's value
+   * @return the current BlockEntry's value
    * @throws IOException
-   * @throws ClassNotFoundException
    */
-  public SubMatrix getValue() throws IOException, ClassNotFoundException {
+  public SubMatrix getValue() throws IOException {
     return BytesUtil.bytesToSubMatrix(this.values[0]);
   }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,170 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockMapWritable <K, V> implements Map<Integer, V>, Writable,
+    Configurable {
+  private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+  // Static maps of code to class and vice versa. Includes types used in hama
+  // only.
+  static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+  static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+  static {
+    byte code = 0;
+    addToMap(HStoreKey.class, code++);
+    addToMap(ImmutableBytesWritable.class, code++);
+    addToMap(Text.class, code++);
+    addToMap(BlockEntry.class, code++);
+    addToMap(byte[].class, code++);
+  }
+
+  @SuppressWarnings("boxing")
+  private static void addToMap(final Class<?> clazz, final byte code) {
+    CLASS_TO_CODE.put(clazz, code);
+    CODE_TO_CLASS.put(code, clazz);
+  }
+
+  private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+  /** @return the conf */
+  public Configuration getConf() {
+    return conf.get();
+  }
+
+  /** @param conf the conf to set */
+  public void setConf(Configuration conf) {
+    this.conf.set(conf);
+  }
+
+  /** {@inheritDoc} */
+  public void clear() {
+    instance.clear();
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsKey(Object key) {
+    return instance.containsKey(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsValue(Object value) {
+    return instance.containsValue(value);
+  }
+
+  /** {@inheritDoc} */
+  public Set<Entry<Integer, V>> entrySet() {
+    return instance.entrySet();
+  }
+
+  /** {@inheritDoc} */
+  public V get(Object key) {
+    return instance.get(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean isEmpty() {
+    return instance.isEmpty();
+  }
+
+  /** {@inheritDoc} */
+  public Set<Integer> keySet() {
+    return instance.keySet();
+  }
+
+  /** {@inheritDoc} */
+  public int size() {
+    return instance.size();
+  }
+
+  /** {@inheritDoc} */
+  public Collection<V> values() {
+    return instance.values();
+  }
+
+  // Writable
+
+  /** @return the Class class for the specified id */
+  protected Class<?> getClass(byte id) {
+    return CODE_TO_CLASS.get(id);
+  }
+
+  /** @return the id for the specified Class */
+  protected byte getId(Class<?> clazz) {
+    Byte b = CLASS_TO_CODE.get(clazz);
+    if (b == null) {
+      throw new NullPointerException("Nothing for : " + clazz);
+    }
+    return b;
+  }
+
+  @Override
+  public String toString() {
+    return this.instance.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    // Write out the number of entries in the map
+    out.writeInt(this.instance.size());
+
+    // Then write out each key/value pair
+    for (Map.Entry<Integer, V> e : instance.entrySet()) {
+      Bytes.writeByteArray(out, BytesUtil.getBlockIndex(e.getKey()));
+      out.writeByte(getId(e.getValue().getClass()));
+      ((Writable) e.getValue()).write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+    // First clear the map. Otherwise we will just accumulate
+    // entries every time this method is called.
+    this.instance.clear();
+
+    // Read the number of entries in the map
+    int entries = in.readInt();
+
+    // Then read each key/value pair
+    for (int i = 0; i < entries; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+          .readByte()), getConf());
+      value.readFields(in);
+      V v = (V) value;
+      this.instance.put(BytesUtil.getBlockIndex(key), v);
+    }
+  }
+
+  public void putAll(Map<? extends Integer, ? extends V> m) {
+    this.instance.putAll(m);
+  }
+
+  public V remove(Object key) {
+    return this.instance.remove(key);
+  }
+
+  public V put(Integer key, V value) {
+    return this.instance.put(key, value);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,136 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockWritable implements Writable, Map<Integer, BlockEntry> {
+
+  public Integer row;
+  public BlockMapWritable<Integer, BlockEntry> entries;
+
+  public BlockWritable() {
+    this(new BlockMapWritable<Integer, BlockEntry>());
+  }
+
+  public BlockWritable(BlockMapWritable<Integer, BlockEntry> entries) {
+    this.entries = entries;
+  }
+
+  public BlockWritable(int i, int j, SubMatrix mult) throws IOException {
+    this.row = i;
+    BlockMapWritable<Integer, BlockEntry> tr = new BlockMapWritable<Integer, BlockEntry>();
+    tr.put(j, new BlockEntry(mult));
+    this.entries = tr;
+  }
+
+  public int size() {
+    return this.entries.size();
+  }
+  
+  public SubMatrix get(int key) throws IOException {
+    return this.entries.get(key).getValue();
+  }
+  
+  public BlockEntry put(Integer key, BlockEntry value) {
+    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  }
+
+  public BlockEntry get(Object key) {
+    return this.entries.get(key);
+  }
+
+  public BlockEntry remove(Object key) {
+    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  }
+
+  public boolean containsKey(Object key) {
+    return entries.containsKey(key);
+  }
+
+  public boolean containsValue(Object value) {
+    throw new UnsupportedOperationException("Don't support containsValue!");
+  }
+
+  public boolean isEmpty() {
+    return entries.isEmpty();
+  }
+
+  public void clear() {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  public Set<Integer> keySet() {
+    Set<Integer> result = new TreeSet<Integer>();
+    for (Integer w : entries.keySet()) {
+      result.add(w);
+    }
+    return result;
+  }
+
+  public Set<Map.Entry<Integer, BlockEntry>> entrySet() {
+    return Collections.unmodifiableSet(this.entries.entrySet());
+  }
+
+  public Collection<BlockEntry> values() {
+    ArrayList<BlockEntry> result = new ArrayList<BlockEntry>();
+    for (Writable w : entries.values()) {
+      result.add((BlockEntry) w);
+    }
+    return result;
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.row = BytesUtil.bytesToInt(Bytes.readByteArray(in));
+    this.entries.readFields(in);
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, BytesUtil.intToBytes(this.row));
+    this.entries.write(out);
+  }
+
+  public void putAll(Map<? extends Integer, ? extends BlockEntry> m) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  /**
+   * 
+   * The inner class for an entry of row.
+   * 
+   */
+  public static class Entries implements Map.Entry<byte[], BlockEntry> {
+
+    private final byte[] column;
+    private final BlockEntry entry;
+
+    Entries(byte[] column, BlockEntry entry) {
+      this.column = column;
+      this.entry = entry;
+    }
+
+    public BlockEntry setValue(BlockEntry c) {
+      throw new UnsupportedOperationException("VectorWritable is read-only!");
+    }
+
+    public byte[] getKey() {
+      byte[] key = column;
+      return key;
+    }
+
+    public BlockEntry getValue() {
+      return entry;
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,190 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class MapWritable<K, V> implements Map<Integer, V>, Writable,
+    Configurable {
+  private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+  // Static maps of code to class and vice versa. Includes types used in hama
+  // only.
+  static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+  static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+  static {
+    byte code = 0;
+    addToMap(HStoreKey.class, code++);
+    addToMap(ImmutableBytesWritable.class, code++);
+    addToMap(Text.class, code++);
+    addToMap(DoubleEntry.class, code++);
+    addToMap(BlockEntry.class, code++);
+    addToMap(byte[].class, code++);
+  }
+
+  @SuppressWarnings("boxing")
+  private static void addToMap(final Class<?> clazz, final byte code) {
+    CLASS_TO_CODE.put(clazz, code);
+    CODE_TO_CLASS.put(code, clazz);
+  }
+
+  private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+  /** @return the conf */
+  public Configuration getConf() {
+    return conf.get();
+  }
+
+  /** @param conf the conf to set */
+  public void setConf(Configuration conf) {
+    this.conf.set(conf);
+  }
+
+  /** {@inheritDoc} */
+  public void clear() {
+    instance.clear();
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsKey(Object key) {
+    return instance.containsKey(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsValue(Object value) {
+    return instance.containsValue(value);
+  }
+
+  /** {@inheritDoc} */
+  public Set<Entry<Integer, V>> entrySet() {
+    return instance.entrySet();
+  }
+
+  /** {@inheritDoc} */
+  public V get(Object key) {
+    return instance.get(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean isEmpty() {
+    return instance.isEmpty();
+  }
+
+  /** {@inheritDoc} */
+  public Set<Integer> keySet() {
+    return instance.keySet();
+  }
+
+  /** {@inheritDoc} */
+  public int size() {
+    return instance.size();
+  }
+
+  /** {@inheritDoc} */
+  public Collection<V> values() {
+    return instance.values();
+  }
+
+  // Writable
+
+  /** @return the Class class for the specified id */
+  protected Class<?> getClass(byte id) {
+    return CODE_TO_CLASS.get(id);
+  }
+
+  /** @return the id for the specified Class */
+  protected byte getId(Class<?> clazz) {
+    Byte b = CLASS_TO_CODE.get(clazz);
+    if (b == null) {
+      throw new NullPointerException("Nothing for : " + clazz);
+    }
+    return b;
+  }
+
+  @Override
+  public String toString() {
+    return this.instance.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    // Write out the number of entries in the map
+    out.writeInt(this.instance.size());
+
+    // Then write out each key/value pair
+    for (Map.Entry<Integer, V> e : instance.entrySet()) {
+      Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey()));
+      out.writeByte(getId(e.getValue().getClass()));
+      ((Writable) e.getValue()).write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+    // First clear the map. Otherwise we will just accumulate
+    // entries every time this method is called.
+    this.instance.clear();
+
+    // Read the number of entries in the map
+    int entries = in.readInt();
+
+    // Then read each key/value pair
+    for (int i = 0; i < entries; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+          .readByte()), getConf());
+      value.readFields(in);
+      V v = (V) value;
+      this.instance.put(BytesUtil.getColumnIndex(key), v);
+    }
+  }
+
+  public void putAll(Map<? extends Integer, ? extends V> m) {
+    this.instance.putAll(m);
+  }
+
+  public V remove(Object key) {
+    return this.instance.remove(key);
+  }
+
+  public V put(Integer key, V value) {
+    return this.instance.put(key, value);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java Sun Nov 16 17:57:47 2008
@@ -38,13 +38,13 @@
 public class VectorWritable implements Writable, Map<Integer, DoubleEntry> {
 
   public Integer row;
-  public VectorMapWritable<Integer, DoubleEntry> entries;
+  public MapWritable<Integer, DoubleEntry> entries;
 
   public VectorWritable() {
-    this(new VectorMapWritable<Integer, DoubleEntry>());
+    this(new MapWritable<Integer, DoubleEntry>());
   }
 
-  public VectorWritable(VectorMapWritable<Integer, DoubleEntry> entries) {
+  public VectorWritable(MapWritable<Integer, DoubleEntry> entries) {
     this.entries = entries;
   }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,36 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
+import org.apache.hama.io.BlockWritable;
+import org.apache.log4j.Logger;
+
+public abstract class BlockCyclicMap<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Mapper<IntWritable, BlockWritable, K, V> {
+  static final Logger LOG = Logger.getLogger(BlockCyclicMap.class);
+  public static Matrix MATRIX_B;
+
+  public static void initJob(String matrixA,
+      Class<? extends BlockCyclicMap> mapper, JobConf job) {
+
+    job.setInputFormat(BlockInputFormat.class);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, matrixA);
+
+    job.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+  }
+
+  public abstract void map(IntWritable key, BlockWritable value,
+      OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,47 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+
+public abstract class BlockCyclicReduce<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Reducer<K, V, IntWritable, VectorUpdate> {
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<? extends BlockCyclicReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  /**
+   * 
+   * @param key
+   * @param values
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public abstract void reduce(K key, Iterator<V> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,55 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class BlockInputFormat extends BlockInputFormatBase implements
+    JobConfigurable {
+  private static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
+
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+  /** {@inheritDoc} */
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte[][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColums(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,250 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+public abstract class BlockInputFormatBase implements
+    InputFormat<IntWritable, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockInputFormatBase.class);
+  private byte[][] inputColumns;
+  private HTable table;
+  private TableRecordReader tableRecordReader;
+  private RowFilterInterface rowFilter;
+
+  /**
+   * Iterate over an HBase table data, return (Text, VectorWritable) pairs
+   */
+  protected static class TableRecordReader implements
+      RecordReader<IntWritable, BlockWritable> {
+    private byte[] startRow;
+    private byte[] endRow;
+    private RowFilterInterface trrRowFilter;
+    private Scanner scanner;
+    private HTable htable;
+    private byte[][] trrInputColumns;
+
+    /**
+     * Build the scanner. Not done in constructor to allow for extension.
+     * 
+     * @throws IOException
+     */
+    public void init() throws IOException {
+      if ((endRow != null) && (endRow.length > 0)) {
+        if (trrRowFilter != null) {
+          final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+          rowFiltersSet.add(new StopRowFilter(endRow));
+          rowFiltersSet.add(trrRowFilter);
+          this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+              new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+                  rowFiltersSet));
+        } else {
+          this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+              endRow);
+        }
+      } else {
+        this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+            trrRowFilter);
+      }
+    }
+
+    /**
+     * @param htable the {@link HTable} to scan.
+     */
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    /**
+     * @param inputColumns the columns to be placed in {@link BlockWritable}.
+     */
+    public void setInputColumns(final byte[][] inputColumns) {
+      byte[][] columns = inputColumns;
+      this.trrInputColumns = columns;
+    }
+
+    /**
+     * @param startRow the first row in the split
+     */
+    public void setStartRow(final byte[] startRow) {
+      byte[] sRow = startRow;
+      this.startRow = sRow;
+    }
+
+    /**
+     * 
+     * @param endRow the last row in the split
+     */
+    public void setEndRow(final byte[] endRow) {
+      byte[] eRow = endRow;
+      this.endRow = eRow;
+    }
+
+    /**
+     * @param rowFilter the {@link RowFilterInterface} to be used.
+     */
+    public void setRowFilter(RowFilterInterface rowFilter) {
+      this.trrRowFilter = rowFilter;
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      this.scanner.close();
+    }
+
+    /**
+     * @return IntWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public IntWritable createKey() {
+      return new IntWritable();
+    }
+
+    /**
+     * @return BlockWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public BlockWritable createValue() {
+      return new BlockWritable();
+    }
+
+    /** {@inheritDoc} */
+    public long getPos() {
+      // This should be the ordinal tuple in the range;
+      // not clear how to calculate...
+      return 0;
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key IntWritable as input key.
+     * @param value BlockWritable as input value
+     * 
+     * Converts Scanner.next() to IntWritable, BlockWritable
+     * 
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(IntWritable key, BlockWritable value)
+        throws IOException {
+      RowResult result = this.scanner.next();
+      boolean hasMore = result != null && result.size() > 0;
+      if (hasMore) {
+        key.set(BytesUtil.bytesToInt(result.getRow()));
+        Writables.copyWritable(result, value);
+      }
+      return hasMore;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   * 
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<IntWritable, BlockWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    numSplits = 2;
+    Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+    if (BytesUtil.bytesToInt(meta.getValue()) < numSplits) {
+      numSplits = BytesUtil.bytesToInt(meta.getValue());
+    }
+
+    int[] startKeys = new int[numSplits];
+    int interval = BytesUtil.bytesToInt(meta.getValue()) / numSplits;
+
+    for (int i = 0; i < numSplits; i++) {
+      startKeys[i] = (i * interval);
+    }
+
+    InputSplit[] splits = new InputSplit[startKeys.length];
+    for (int i = 0; i < startKeys.length; i++) {
+      splits[i] = new TableSplit(this.table.getTableName(), BytesUtil
+          .intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ? BytesUtil
+          .intToBytes(startKeys[i + 1]) : HConstants.EMPTY_START_ROW);
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link BlockWritable} to the map task.
+   */
+  protected void setInputColums(byte[][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   * 
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   * 
+   * @param tableRecordReader to provide other {@link TableRecordReader}
+   *                implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   * 
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Sun Nov 16 17:57:47 2008
@@ -33,7 +33,7 @@
 
 public class VectorInputFormat extends VectorInputFormatBase implements
     JobConfigurable {
-  private final Log LOG = LogFactory.getLog(VectorInputFormat.class);
+  private static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
 
   /**
    * space delimited list of columns

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java Sun Nov 16 17:57:47 2008
@@ -99,7 +99,7 @@
     return Bytes.toBytes(Constants.COLUMN + String.valueOf(integer));
   }
   
-  public static byte[] subMatrixToBytes(Object obj) throws IOException {
+  public static byte[] subMatrixToBytes(SubMatrix obj) throws IOException {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     ObjectOutputStream oos = new ObjectOutputStream(bos);
     oos.writeObject(obj);
@@ -110,13 +110,27 @@
     return data;
   }
   
-  public static SubMatrix bytesToSubMatrix(byte[] value) throws IOException,
-      ClassNotFoundException {
+  public static SubMatrix bytesToSubMatrix(byte[] value) throws IOException {
     ByteArrayInputStream bos = new ByteArrayInputStream(value);
     ObjectInputStream oos = new ObjectInputStream(bos);
-    Object obj = oos.readObject();
+    Object obj = null;
+    try {
+      obj = oos.readObject();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
     oos.close();
     bos.close();
     return (SubMatrix) obj;
   }
+
+  public static byte[] getBlockIndex(int integer) {
+    return Bytes.toBytes(Constants.BLOCK + String.valueOf(integer));
+  }
+
+  public static int getBlockIndex(byte[] key) {
+    String cKey = new String(key);
+    return Integer.parseInt(cKey
+        .substring(cKey.indexOf(":") + 1, cKey.length()));
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java Sun Nov 16 17:57:47 2008
@@ -23,23 +23,21 @@
 
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hama.Matrix;
 
 /**
  * A map/reduce job manager 
  */
 public class JobManager {
-  
   public static void execute(JobConf jobConf, Matrix result) throws IOException {
-    RunningJob rJob = JobClient.runJob(jobConf);
-    // TODO : When HADOOP-4043 done, we should change this.
-    long rows = rJob.getCounters().findCounter(
-        "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
-        .getCounter();
+    JobClient.runJob(jobConf);
+    //long rows = rJob.getCounters().findCounter(
+      //  "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
+      // .getCounter();
     // TODO : Thinking about more efficient method.
+    int rows = result.getColumn(0).size();
     int columns = result.getRow(0).size();
-    result.setDimension((int) rows, columns);
+    result.setDimension(rows, columns);
   }
   
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Sun Nov 16 17:57:47 2008
@@ -44,13 +44,13 @@
   private static HamaConfiguration conf;
   private static HBaseAdmin admin;
   private static HamaAdmin hamaAdmin;
-  
+
   public static Test suite() {
     TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
       protected void setUp() throws Exception {
         HCluster hCluster = new HCluster();
         hCluster.setUp();
-        
+
         conf = hCluster.getConf();
         admin = new HBaseAdmin(conf);
         hamaAdmin = new HamaAdminImpl(conf, admin);
@@ -75,6 +75,27 @@
     m2.close();
   }
 
+  public void testBlocking() throws IOException, ClassNotFoundException {
+    assertEquals(((DenseMatrix) m1).isBlocked(), false);
+    ((DenseMatrix) m1).blocking(2);
+    assertEquals(((DenseMatrix) m1).isBlocked(), true);
+    int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
+    double[][] a = ((DenseMatrix) m1).blockMatrix(1, 0).getDoubles();
+    LOG.info(pos[0]+", "+pos[1]+", "+pos[2]+", "+pos[3]);
+    double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubles();
+    double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubles();
+    assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
+    assertEquals(c.length, 5);
+    
+    for (int i = 0; i < a.length; i++) {
+      for (int j = 0; j < a.length; j++) {
+        assertEquals(a[i][j], b[i][j]);
+        assertEquals(a[i][j], c[i][j]);
+        assertEquals(b[i][j], c[i][j]);
+      }
+    }
+  }
+
   /**
    * Column vector test.
    * 
@@ -90,7 +111,7 @@
       x++;
     }
   }
-
+  
   public void testGetSetAttribute() throws IOException {
     m1.setRowAttribute(0, "row1");
     assertEquals(m1.getRowAttribute(0), "row1");
@@ -108,10 +129,10 @@
         assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
       }
     }
-    
+
     SubMatrix b = m2.subMatrix(0, 2, 0, 2);
     SubMatrix c = a.mult(b);
-    
+
     double[][] C = new double[3][3];
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 3; j++) {
@@ -120,7 +141,7 @@
         }
       }
     }
-    
+
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 3; j++) {
         assertEquals(C[i][j], c.get(i, j));
@@ -181,7 +202,7 @@
 
   public void testLoadSave() throws IOException {
     String path1 = m1.getPath();
-    // save m1 to aliase1 
+    // save m1 to aliase1
     m1.save(aliase1);
     // load matrix m1 using aliase1
     DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
@@ -191,11 +212,11 @@
         assertEquals(m1.get(i, j), loadTest.get(i, j));
       }
     }
-    
+
     assertEquals(path1, loadTest.getPath());
     // close loadTest, it just disconnect to the table but didn't delete it.
     loadTest.close();
-    
+
     // try to close m1 & load matrix m1 using aliase1 again.
     m1.close();
     DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
@@ -209,23 +230,23 @@
     // it will do the gc!
     loadTest2.close();
     assertEquals(false, admin.tableExists(path1));
-    
+
     // if we try to load non-existed matrix using aliase name, it should fail.
     DenseMatrix loadTest3 = null;
     try {
       loadTest3 = new DenseMatrix(conf, aliase1, false);
       fail("Try to load a non-existed matrix should fail!");
     } catch (IOException e) {
-      
+
     } finally {
-      if(loadTest3!=null)
+      if (loadTest3 != null)
         loadTest3.close();
     }
   }
-  
+
   public void testForceCreate() throws IOException {
     String path2 = m2.getPath();
-    // save m2 to aliase2 
+    // save m2 to aliase2
     m2.save(aliase2);
     // load matrix m2 using aliase2
     DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
@@ -235,16 +256,16 @@
         assertEquals(m2.get(i, j), loadTest.get(i, j));
       }
     }
-    
+
     assertEquals(path2, loadTest.getPath());
-    
+
     // force to create matrix loadTest2 using aliasename 'aliase2'
     DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
     String loadPath2 = loadTest2.getPath();
     assertFalse(path2.equals(loadPath2));
     assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
     assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
-    
+
     // try to close m2 & loadTest, it table will be deleted finally
     m2.close();
     assertEquals(true, admin.tableExists(path2));
@@ -280,8 +301,8 @@
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
-            .valueOf(C[i][j]).substring(0, 14));
+        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), 
+            String.valueOf(C[i][j]).substring(0, 14));
       }
     }
   }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java Sun Nov 16 17:57:47 2008
@@ -19,6 +19,7 @@
  */
 package org.apache.hama;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import junit.extensions.TestSetup;
@@ -113,9 +114,11 @@
 
   /**
    * Test get/set methods
+   * @throws IOException 
    */
-  public void testGetSet() {
+  public void testGetSet() throws IOException {
     assertEquals(v1.get(0), values[0][0]);
+    assertEquals(m1.getColumn(0).size(), 2);
   }
 
   /**

Added: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,66 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HCluster;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.io.BlockWritable;
+import org.apache.log4j.Logger;
+
+public class TestBlockMatrixMapReduce extends HCluster {
+  static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+  static Matrix c;
+  static final int SIZE = 20;
+  /** constructor */
+  public TestBlockMatrixMapReduce() {
+    super();
+  }
+
+  public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
+    Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+    Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+    ((DenseMatrix) m1).blocking(2);
+    ((DenseMatrix) m2).blocking(2);
+
+    miniMRJob(m1.getPath(), m2.getPath());
+    
+    double[][] C = new double[SIZE][SIZE];
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        for (int k = 0; k < SIZE; k++) {
+          C[i][k] += m1.get(i, j) * m2.get(j, k);
+        }
+      }
+    }
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        assertEquals(String.valueOf(C[i][j]).substring(0, 5), 
+            String.valueOf(c.get(i, j)).substring(0, 5));
+      }
+    }
+  }
+
+  private void miniMRJob(String string, String string2) throws IOException {
+    c = new DenseMatrix(conf);
+    String output = c.getPath();
+    
+    JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
+    jobConf.setJobName("test MR job");
+
+    BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
+        BlockWritable.class, jobConf);
+    BlockCyclicReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
+
+    jobConf.setNumMapTasks(2);
+    jobConf.setNumReduceTasks(2);
+
+    JobClient.runJob(jobConf);
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Sun Nov 16 17:57:47 2008
@@ -37,9 +37,6 @@
  */
 public class TestMatrixMapReduce extends HCluster {
   static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
-  String pathA;
-  String pathB;
-  String output;
   
   /** constructor */
   public TestMatrixMapReduce() {
@@ -48,28 +45,26 @@
 
   public void testMatrixMapReduce() throws IOException {
     Matrix matrixA = new DenseMatrix(conf);
-    pathA = matrixA.getPath();
     matrixA.set(0, 0, 1);
     matrixA.set(0, 1, 0);
     matrixA.setDimension(1, 2);
 
     Matrix matrixB = new DenseMatrix(conf);
-    pathB = matrixB.getPath();
     matrixB.set(0, 0, 1);
     matrixB.set(0, 1, 1);
     matrixB.setDimension(1, 2);
     
-    miniMRJob();
+    miniMRJob(matrixA.getPath(), matrixB.getPath());
   }
 
-  private void miniMRJob() throws IOException {
+  private void miniMRJob(String string, String string2) throws IOException {
     Matrix c = new DenseMatrix(conf);
-    output = c.getPath();
+    String output = c.getPath();
     
     JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
     jobConf.setJobName("test MR job");
 
-    RowCyclicAdditionMap.initJob(pathA, pathB, RowCyclicAdditionMap.class, IntWritable.class,
+    RowCyclicAdditionMap.initJob(string, string2, RowCyclicAdditionMap.class, IntWritable.class,
         VectorWritable.class, jobConf);
     RowCyclicReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);