You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/10/28 09:49:07 UTC

svn commit: r830463 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/java/org/apache/hama/mapreduce/ src/java/org/apache/hama/matrix/ src/java/org/apache/hama/matrix/algebra/ src/test/org/apache/hama/mapr...

Author: edwardyoon
Date: Wed Oct 28 08:49:06 2009
New Revision: 830463

URL: http://svn.apache.org/viewvc?rev=830463&view=rev
Log:
Replacement of Block Multiplication Map/Reduce

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java
    incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 28 08:49:06 2009
@@ -42,6 +42,7 @@
 
   IMPROVEMENTS
 
+    HAMA-217: Replacement of Block Multiplication Map/Reduce (edwardyoon)
     HAMA-205: Replacement of NormMap/Reduce (edwardyoon)
     HAMA-207: Replacement of Mat-Mat addition Map/Reduce (edwardyoon)
     HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)

Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,63 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class CollectBlocksMapper extends TableMapper<BlockID, MapWritable>
+    implements Configurable {
+  private Configuration conf = null;
+  /** Parameter of the path of the matrix to be blocked * */
+  public static final String BLOCK_SIZE = "hama.blocking.size";
+  public static final String ROWS = "hama.blocking.rows";
+  public static final String COLUMNS = "hama.blocking.columns";
+  public static final String MATRIX_POS = "a.ore.b";
+
+  private int mBlockNum;
+  private int mBlockRowSize;
+  private int mBlockColSize;
+  private int mRows;
+  private int mColumns;
+  
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+      throws IOException, InterruptedException {
+    int startColumn, endColumn, blkRow = BytesUtil.getRowIndex(key.get())
+        / mBlockRowSize, i = 0;
+    DenseVector dv = new DenseVector(BytesUtil.getRowIndex(key.get()), value);
+
+    do {
+      startColumn = i * mBlockColSize;
+      endColumn = startColumn + mBlockColSize - 1;
+      if (endColumn >= mColumns) // the last sub vector
+        endColumn = mColumns - 1;
+      context.write(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries());
+
+      i++;
+    } while (endColumn < (mColumns - 1));
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+
+    mBlockNum = Integer.parseInt(conf.get(BLOCK_SIZE, ""));
+    mRows = Integer.parseInt(conf.get(ROWS, ""));
+    mColumns = Integer.parseInt(conf.get(COLUMNS, ""));
+
+    mBlockRowSize = mRows / mBlockNum;
+    mBlockColSize = mColumns / mBlockNum;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,108 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SubMatrix;
+
+public class CollectBlocksReducer extends
+    TableReducer<BlockID, MapWritable, Writable> implements Configurable {
+  private Configuration conf = null;
+  private int mBlockNum;
+  private int mBlockRowSize;
+  private int mBlockColSize;
+  private int mRows;
+  private int mColumns;
+  private boolean matrixPos;
+  
+  public void reduce(BlockID key, Iterable<MapWritable> values,
+      Context context) throws IOException, InterruptedException {
+    // the block's base offset in the original matrix
+    int colBase = key.getColumn() * mBlockColSize;
+    int rowBase = key.getRow() * mBlockRowSize;
+    
+
+    // the block's size : rows & columns
+    int smRows = mBlockRowSize;
+    if ((rowBase + mBlockRowSize - 1) >= mRows)
+      smRows = mRows - rowBase;
+    int smCols = mBlockColSize;
+    if ((colBase + mBlockColSize - 1) >= mColumns)
+      smCols = mColumns - colBase;
+
+    // construct the matrix
+    SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+    // i, j is the current offset in the sub-matrix
+    int i = 0, j = 0;
+    for (MapWritable value : values) {
+      DenseVector vw = new DenseVector(value);
+      // check the size is suitable
+      if (vw.size() != smCols)
+        throw new IOException("Block Column Size dismatched.");
+      i = vw.getRow() - rowBase;
+      
+      if (i >= smRows || i < 0)
+        throw new IOException("Block Row Size dismatched.");
+
+      // put the subVector to the subMatrix
+      for (j = 0; j < smCols; j++) {
+        subMatrix.set(i, j, vw.get(colBase + j));
+      }
+    }
+    //BlockWritable outValue = new BlockWritable(subMatrix);
+    
+    // It'll used for only matrix multiplication.
+    if (matrixPos) {
+      for (int x = 0; x < mBlockNum; x++) {
+        int r = (key.getRow() * mBlockNum) * mBlockNum;
+        int seq = (x * mBlockNum) + key.getColumn() + r;
+        BlockID bkID = new BlockID(key.getRow(), x, seq);
+        Put put = new Put(bkID.getBytes());
+        put.add(Bytes.toBytes(Constants.BLOCK_FAMILY), 
+            Bytes.toBytes("a"), 
+            subMatrix.getBytes());
+        context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+      }
+    } else {
+      for (int x = 0; x < mBlockNum; x++) {
+        int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+            + key.getRow();
+        BlockID bkID = new BlockID(x, key.getColumn(), seq);
+        Put put = new Put(bkID.getBytes());
+        put.add(Bytes.toBytes(Constants.BLOCK_FAMILY), 
+            Bytes.toBytes("b"), 
+            subMatrix.getBytes());
+        context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+
+    mBlockNum = Integer.parseInt(conf.get(CollectBlocksMapper.BLOCK_SIZE, ""));
+    mRows = Integer.parseInt(conf.get(CollectBlocksMapper.ROWS, ""));
+    mColumns = Integer.parseInt(conf.get(CollectBlocksMapper.COLUMNS, ""));
+
+    mBlockRowSize = mRows / mBlockNum;
+    mBlockColSize = mColumns / mBlockNum;
+
+    matrixPos = conf.getBoolean(CollectBlocksMapper.MATRIX_POS, true);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java Wed Oct 28 08:49:06 2009
@@ -24,8 +24,6 @@
 import java.util.NavigableMap;
 
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -33,7 +31,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
 /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Wed Oct 28 08:49:06 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -56,17 +57,16 @@
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.io.Pair;
 import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.CollectBlocksMapper;
 import org.apache.hama.mapred.DummyMapper;
 import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.hama.mapreduce.CollectBlocksMapper;
 import org.apache.hama.mapreduce.RandomMatrixMapper;
 import org.apache.hama.mapreduce.RandomMatrixReducer;
-import org.apache.hama.matrix.algebra.BlockMultiplyMap;
-import org.apache.hama.matrix.algebra.BlockMultiplyReduce;
+import org.apache.hama.matrix.algebra.BlockMultMap;
+import org.apache.hama.matrix.algebra.BlockMultReduce;
 import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
 import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
 import org.apache.hama.matrix.algebra.JacobiEigenValue;
@@ -451,12 +451,11 @@
     Scan scan = new Scan();
     scan.addFamily(Constants.COLUMNFAMILY);
     job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS, B.getPath());
-    job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS, Double
-        .toString(alpha));
-    
+    job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS,
+        Double.toString(alpha));
+
     TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
-        MatrixAdditionMap.class, IntWritable.class,
-        MapWritable.class, job);
+        MatrixAdditionMap.class, IntWritable.class, MapWritable.class, job);
     TableMapReduceUtil.initTableReducerJob(result.getPath(),
         MatrixAdditionReduce.class, job);
     try {
@@ -466,7 +465,7 @@
     } catch (ClassNotFoundException e) {
       e.printStackTrace();
     }
-    
+
     return result;
   }
 
@@ -505,12 +504,13 @@
 
     Scan scan = new Scan();
     scan.addFamily(Constants.COLUMNFAMILY);
-    job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS, summandList.toString());
-    job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS, alphaList.toString());
-    
+    job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS,
+        summandList.toString());
+    job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS,
+        alphaList.toString());
+
     TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
-        MatrixAdditionMap.class, IntWritable.class,
-        MapWritable.class, job);
+        MatrixAdditionMap.class, IntWritable.class, MapWritable.class, job);
     TableMapReduceUtil.initTableReducerJob(result.getPath(),
         MatrixAdditionReduce.class, job);
     try {
@@ -520,7 +520,7 @@
     } catch (ClassNotFoundException e) {
       e.printStackTrace();
     }
-    
+
     return result;
   }
 
@@ -548,7 +548,7 @@
 
     DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
     List<Job> jobId = new ArrayList<Job>();
-    
+
     for (int i = 0; i < this.getRows(); i++) {
       Job job = new Job(config, "multiplication MR job : " + result.getPath()
           + " " + i);
@@ -560,8 +560,8 @@
       job.getConfiguration().setInt(DenseMatrixVectorMultMap.ITH_ROW, i);
 
       TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
-          DenseMatrixVectorMultMap.class, IntWritable.class,
-          MapWritable.class, job);
+          DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
+          job);
       TableMapReduceUtil.initTableReducerJob(result.getPath(),
           DenseMatrixVectorMultReduce.class, job);
       try {
@@ -582,7 +582,7 @@
         e.printStackTrace();
       }
     }
-    
+
     return result;
   }
 
@@ -599,7 +599,8 @@
 
     String collectionTable = "collect_" + RandomVariable.randMatrixPath();
     HTableDescriptor desc = new HTableDescriptor(collectionTable);
-    desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+    desc
+        .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK_FAMILY)));
     this.admin.createTable(desc);
     LOG.info("Collect Blocks");
 
@@ -609,18 +610,24 @@
     DenseMatrix result = new DenseMatrix(config, this.getRows(), this
         .getColumns());
 
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("multiplication MR job : " + result.getPath());
+    Job job = new Job(config, "multiplication MR job : " + result.getPath());
+
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
 
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
+        BlockMultMap.class, BlockID.class, BytesWritable.class, job);
+    TableMapReduceUtil.initTableReducerJob(result.getPath(),
+        BlockMultReduce.class, job);
 
-    BlockMultiplyMap.initJob(collectionTable, BlockMultiplyMap.class,
-        BlockID.class, BlockWritable.class, jobConf);
-    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
-        jobConf);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
 
-     JobClient.runJob(jobConf);
     hamaAdmin.delete(collectionTable);
     return result;
   }
@@ -687,8 +694,7 @@
 
     Scan scan = new Scan();
     for (int j = j0, jj = 0; j <= j1; j++, jj++) {
-      scan.addColumn(Constants.COLUMNFAMILY, Bytes
-          .toBytes(String.valueOf(j)));
+      scan.addColumn(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j)));
     }
     scan.setStartRow(BytesUtil.getRowIndex(i0));
     scan.setStopRow(BytesUtil.getRowIndex(i1 + 1));
@@ -700,8 +706,8 @@
     while (it.hasNext()) {
       rs = it.next();
       for (int j = j0, jj = 0; j <= j1; j++, jj++) {
-        byte[] vv = rs.getValue(Constants.COLUMNFAMILY, Bytes
-            .toBytes(String.valueOf(j)));
+        byte[] vv = rs.getValue(Constants.COLUMNFAMILY, Bytes.toBytes(String
+            .valueOf(j)));
         System.out.println(BytesUtil.bytesToDouble(vv));
         result.set(i, jj, vv);
       }
@@ -727,21 +733,32 @@
       throw new IOException("can't divide.");
 
     int block_size = (int) blocks;
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("Blocking MR job" + getPath());
+    Job job = new Job(config, "Blocking MR job" + getPath());
 
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-    jobConf.setMapperClass(CollectBlocksMapper.class);
-    jobConf.setInputFormat(VectorInputFormat.class);
-    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    Scan scan = new Scan();
+    scan.addFamily(Constants.COLUMNFAMILY);
 
-    org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, path);
+    job.getConfiguration().set(CollectBlocksMapper.BLOCK_SIZE,
+        String.valueOf(block_size));
+    job.getConfiguration().set(CollectBlocksMapper.ROWS,
+        String.valueOf(this.getRows()));
+    job.getConfiguration().set(CollectBlocksMapper.COLUMNS,
+        String.valueOf(this.getColumns()));
+    job.getConfiguration().setBoolean(CollectBlocksMapper.MATRIX_POS, bool);
 
-    CollectBlocksMapper.initJob(collectionTable, bool, block_size, this
-        .getRows(), this.getColumns(), jobConf);
+    TableMapReduceUtil.initTableMapperJob(path, scan,
+        org.apache.hama.mapreduce.CollectBlocksMapper.class, BlockID.class,
+        MapWritable.class, job);
+    TableMapReduceUtil.initTableReducerJob(collectionTable,
+        org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
 
-    JobClient.runJob(jobConf);
+    try {
+      job.waitForCompletion(true);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
   }
 
   /**
@@ -914,7 +931,7 @@
         e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
             Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
             Bytes.toBytes(String.valueOf(i))));
-       
+
         get = new Get(BytesUtil.getRowIndex(pivot_col));
         e2 = BytesUtil.bytesToDouble(table.get(get).getValue(
             Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
@@ -943,15 +960,15 @@
     Get get = null;
     if (row + 2 < size) {
       get = new Get(BytesUtil.getRowIndex(row));
-      
+
       double max = BytesUtil.bytesToDouble(table.get(get).getValue(
-          Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY), 
+          Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
           Bytes.toBytes(String.valueOf(m))));
       double val;
       for (int i = row + 2; i < size; i++) {
         get = new Get(BytesUtil.getRowIndex(row));
         val = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY), 
+            Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
             Bytes.toBytes(String.valueOf(i))));
         if (Math.abs(val) > Math.abs(max)) {
           m = i;
@@ -968,26 +985,28 @@
   int update(int row, double value, int state) throws IOException {
     Get get = new Get(BytesUtil.getRowIndex(row));
     double e = BytesUtil.bytesToDouble(table.get(get).getValue(
-        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY), 
+        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
         Bytes.toBytes(JacobiEigenValue.EI_VAL)));
     int changed = BytesUtil.bytesToInt(table.get(get).getValue(
-        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY), 
+        Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
         Bytes.toBytes("changed")));
     double y = e;
     e += value;
 
     VectorUpdate vu = new VectorUpdate(row);
     vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EI_VAL, e);
-    
+
     if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
       changed = 0;
-      vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
-      
+      vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
+          JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+
       state--;
     } else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
       changed = 1;
-      vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
-      
+      vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
+          JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+
       state++;
     }
     table.put(vu.getPut());
@@ -1011,7 +1030,7 @@
       for (int j = 0; j < E[i].length; j++) {
         get = new Get(BytesUtil.getRowIndex(i));
         ev = BytesUtil.bytesToDouble(table.get(get).getValue(
-            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY), 
+            Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
             Bytes.toBytes(String.valueOf(j))));
         success &= ((Math.abs(ev - E[i][j]) < .0000001));
         if (!success)

Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java Wed Oct 28 08:49:06 2009
@@ -50,6 +50,11 @@
     this.initMap(row);
   }
 
+  public DenseVector(int row, Result m) {
+    this.initMap(m);
+    this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+  }
+  
   public DenseVector(int row, MapWritable m) {
     this.entries = m;
     this.entries.put(Constants.ROWCOUNT, new IntWritable(row));

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,25 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.SubMatrix;
+
+public class BlockMultMap extends TableMapper<BlockID, BytesWritable> {
+  private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK_FAMILY);
+  
+  public void map(ImmutableBytesWritable key, Result value, Context context) 
+  throws IOException, InterruptedException {
+    SubMatrix a = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("a")));
+    SubMatrix b = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("b")));
+    
+    SubMatrix c = a.mult(b);
+    context.write(new BlockID(key.get()), new BytesWritable(c.getBytes()));
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,43 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.matrix.SubMatrix;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockMultReduce extends
+    TableReducer<BlockID, BytesWritable, Writable> {
+  
+  @Override
+  public void reduce(BlockID key, Iterable<BytesWritable> values,
+      Context context) throws IOException, InterruptedException {
+    SubMatrix s = null;
+    for (BytesWritable value : values) {
+      SubMatrix b = new SubMatrix(value.getBytes());
+      if (s == null) {
+        s = b;
+      } else {
+        s = s.add(b);
+      }
+    }
+
+    int startRow = key.getRow() * s.getRows();
+    int startColumn = key.getColumn() * s.getColumns();
+
+    for (int i = 0; i < s.getRows(); i++) {
+      VectorUpdate update = new VectorUpdate(i + startRow);
+      for (int j = 0; j < s.getColumns(); j++) {
+        update.put(j + startColumn, s.get(i, j));
+      }
+
+      context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key
+          .getRow())), update.getPut());
+    }
+  }
+}

Added: incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaCluster;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.log4j.Logger;
+
+public class TestBlockMatrixMapReduce extends HamaCluster {
+  static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+  static final int SIZE = 32;
+
+  /** constructor */
+  public TestBlockMatrixMapReduce() {
+    super();
+  }
+
+  public void testBlockMatrixMapReduce() throws IOException,
+      ClassNotFoundException {
+    DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+    DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+
+    DenseMatrix c = (DenseMatrix) m1.mult(m2, 16);
+
+    double[][] mem = new double[SIZE][SIZE];
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        for (int k = 0; k < SIZE; k++) {
+          mem[i][k] += m1.get(i, j) * m2.get(j, k);
+        }
+      }
+    }
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        double gap = (mem[i][j] - c.get(i, j));
+        assertTrue(gap < 0.000001 || gap < -0.000001);
+      }
+    }
+  }
+}