You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/16 04:21:48 UTC

svn commit: r726934 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/ src/test/org/apache/hama/mapred/

Author: edwardyoon
Date: Mon Dec 15 19:21:47 2008
New Revision: 726934

URL: http://svn.apache.org/viewvc?rev=726934&view=rev
Log:
Refactor mapred and I/O packages

Removed:
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Dec 15 19:21:47 2008
@@ -33,6 +33,7 @@
     
   IMPROVEMENTS
     
+    HAMA-135, HAMA-137: Refactor mapred, I/O package (edwardyoon)
     HAMA-134: We don't need to fill C with zeros (edwardyoon)
     HAMA-131: Add argument for the number of blocks (edwardyoon)
     HAMA-113: Random matrix generator on map/reduce (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Dec 15 19:21:47 2008
@@ -99,10 +99,7 @@
   /** end column of block */
   public static final String BLOCK_ENDCOLUMN = "attribute:endColumn";
 
-
-  public static final String BLOCK_POSITION = Constants.BLOCK_STARTROW 
-  + " " + Constants.BLOCK_ENDROW + " " + Constants.BLOCK_STARTCOLUMN 
-  + " " + Constants.BLOCK_ENDCOLUMN;
+  public static final String BLOCK_POSITION = "attribute:blockPosition";
   
   /** block dimension */
   public static final String BLOCK = "block:";

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Dec 15 19:21:47 2008
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -47,6 +48,7 @@
 import org.apache.hama.algebra.SIMDMultiplyMap;
 import org.apache.hama.algebra.SIMDMultiplyReduce;
 import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.io.MapWritable;
@@ -393,7 +395,7 @@
 
     if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
       BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
-          BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
+          BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
           jobConf);
       BlockCyclicMultiplyReduce.initJob(result.getPath(),
           BlockCyclicMultiplyReduce.class, jobConf);
@@ -474,8 +476,8 @@
   }
 
   public SubMatrix getBlock(int i, int j) throws IOException {
-    return new SubMatrix(table.get(String.valueOf(i), Constants.BLOCK + j)
-        .getValue());
+    return new SubMatrix(table.get(new BlockID(i, j).getBytes(), 
+        Bytes.toBytes(Constants.BLOCK)).getValue());
   }
 
   /**
@@ -494,8 +496,8 @@
   }
 
   public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
-    BatchUpdate update = new BatchUpdate(String.valueOf(i));
-    update.put(Constants.BLOCK + j, matrix.getBytes());
+    BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
+    update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
     table.commit(update);
   }
 
@@ -519,11 +521,8 @@
           endColumn = this.getColumns() - 1;
 
         BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
-        update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
-        update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
-        update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
-            .intToBytes(startColumn));
-        update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
+        update.put(Constants.BLOCK_POSITION, 
+            new BlockPosition(startRow, endRow, startColumn, endColumn).getBytes());
         table.commit(update);
 
         j++;
@@ -533,19 +532,10 @@
     } while (endRow < (this.getRows() - 1));
   }
 
-  protected int[] getBlockPosition(int i, int j) throws IOException {
-    RowResult rs = table.getRow(new BlockID(i, j).getBytes());
-    int[] result = new int[4];
-
-    result[0] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTROW)
-        .getValue());
-    result[1] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDROW).getValue());
-    result[2] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTCOLUMN)
-        .getValue());
-    result[3] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDCOLUMN)
-        .getValue());
-
-    return result;
+  protected BlockPosition getBlockPosition(int i, int j) throws IOException {
+    byte[] rs = table.get(new BlockID(i, j).getBytes(), 
+        Bytes.toBytes(Constants.BLOCK_POSITION)).getValue();
+    return new BlockPosition(rs);
   }
 
   /**
@@ -577,17 +567,17 @@
    */
   public void blocking(int blockNum) throws IOException {
     this.checkBlockNum(blockNum);
-
-    String[] columns = new String[] { Constants.BLOCK_STARTROW,
-        Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
-        Constants.BLOCK_ENDCOLUMN };
+    
+    String[] columns = new String[] { Constants.BLOCK_POSITION };
     Scanner scan = table.getScanner(columns);
 
     for (RowResult row : scan) {
       BlockID bID = new BlockID(row.getRow());
-      int[] pos = getBlockPosition(bID.getRow(), bID.getColumn());
-      setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos[0], pos[1], pos[2],
-          pos[3]));
+      BlockPosition pos = 
+        new BlockPosition(row.get(Constants.BLOCK_POSITION).getValue());
+      
+      setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), 
+          pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()));
     }
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Mon Dec 15 19:21:47 2008
@@ -33,7 +33,7 @@
  * bigger matrix. This is a in-memory operation only.
  */
 public class SubMatrix implements java.io.Serializable {
-  private static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 3897536498367921547L;
   static final Logger LOG = Logger.getLogger(SubMatrix.class);
   private double[][] matrix;
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Mon Dec 15 19:21:47 2008
@@ -21,7 +21,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -33,22 +32,18 @@
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.SubMatrix;
 import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.mapred.BlockInputFormat;
 import org.apache.log4j.Logger;
 
 public class BlockCyclicMultiplyMap extends MapReduceBase implements
-    Mapper<BlockID, BlockPosition, IntWritable, BlockWritable> {
+    Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
   static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
-  protected DenseMatrix matrix_a;
-  public static final String MATRIX_A = "hama.multiplication.matrix.a";
   protected DenseMatrix matrix_b;
   public static final String MATRIX_B = "hama.multiplication.matrix.b";
 
   public void configure(JobConf job) {
     try {
-      matrix_a = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_A, ""));
       matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
     } catch (IOException e) {
       LOG.warn("Load matrix_b failed : " + e.getMessage());
@@ -56,32 +51,30 @@
   }
 
   public static void initJob(String matrix_a, String matrix_b,
-      Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
+      Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
       Class<BlockWritable> outputValueClass, JobConf jobConf) {
 
     jobConf.setMapOutputValueClass(outputValueClass);
     jobConf.setMapOutputKeyClass(outputKeyClass);
     jobConf.setMapperClass(map);
-    jobConf.set(MATRIX_A, matrix_a);
     jobConf.set(MATRIX_B, matrix_b);
 
     jobConf.setInputFormat(BlockInputFormat.class);
     FileInputFormat.addInputPaths(jobConf, matrix_a);
 
-    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
+    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
   }
 
   @Override
-  public void map(BlockID key, @SuppressWarnings("unused") BlockPosition value,
-      OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
+  public void map(BlockID key, BlockWritable value,
+      OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
       throws IOException {
     int blockSize = matrix_b.getBlockSize();
-    SubMatrix a = matrix_a.getBlock(key.getRow(), key.getColumn());
+    SubMatrix a = value.get();
     for (int j = 0; j < blockSize; j++) {
       SubMatrix b = matrix_b.getBlock(key.getColumn(), j);
       SubMatrix c = a.mult(b);
-      output.collect(new IntWritable(key.getRow()), 
-          new BlockWritable(key.getRow(), j, c));
+      output.collect(new BlockID(key.getRow(), j), new BlockWritable(c));
     }
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java Mon Dec 15 19:21:47 2008
@@ -20,9 +20,7 @@
 package org.apache.hama.algebra;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.io.IntWritable;
@@ -32,14 +30,14 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockEntry;
+import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.mapred.VectorOutputFormat;
 import org.apache.log4j.Logger;
 
 public class BlockCyclicMultiplyReduce extends MapReduceBase implements
-    Reducer<IntWritable, BlockWritable, IntWritable, VectorUpdate> {
+    Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
   static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
 
   /**
@@ -60,39 +58,29 @@
   }
 
   @Override
-  public void reduce(IntWritable key, Iterator<BlockWritable> values,
+  public void reduce(BlockID key, Iterator<BlockWritable> values,
       OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
       throws IOException {
-    int row = key.get();
-    Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
 
+    SubMatrix s = null;
     while (values.hasNext()) {
-      BlockWritable b = values.next();
-      for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
-        int j = e.getKey();
-        SubMatrix value = e.getValue().getValue();
-        if (sum.containsKey(j)) {
-          sum.put(j, sum.get(j).add(value));
-        } else {
-          sum.put(j, value);
-        }
+      SubMatrix b = values.next().get();
+      if (s == null) {
+        s = b;
+      } else {
+        s = s.add(b);
       }
     }
 
-    for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
-      int column = e.getKey();
-      SubMatrix mat = e.getValue();
-
-      int startRow = row * mat.getRows();
-      int startColumn = column * mat.getColumns();
-
-      for (int i = 0; i < mat.getRows(); i++) {
-        VectorUpdate update = new VectorUpdate(i + startRow);
-        for (int j = 0; j < mat.getColumns(); j++) {
-          update.put(j + startColumn, mat.get(i, j));
-        }
-        output.collect(key, update);
+    int startRow = key.getRow() * s.getRows();
+    int startColumn = key.getColumn() * s.getColumns();
+
+    for (int i = 0; i < s.getRows(); i++) {
+      VectorUpdate update = new VectorUpdate(i + startRow);
+      for (int j = 0; j < s.getColumns(); j++) {
+        update.put(j + startColumn, s.get(i, j));
       }
+      output.collect(new IntWritable(key.getRow()), update);
     }
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Mon Dec 15 19:21:47 2008
@@ -32,7 +32,7 @@
 /** A WritableComparable for BlockIDs. */
 @SuppressWarnings("unchecked")
 public class BlockID implements WritableComparable, java.io.Serializable {
-  private static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 6434651179475226613L;
   private int row;
   private int column;
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java Mon Dec 15 19:21:47 2008
@@ -19,130 +19,105 @@
  */
 package org.apache.hama.io;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-public class BlockPosition implements Writable, Map<Text, IntegerEntry> {
-
-  public BlockID row;
-  public BlockPositionMapWritable<Text, IntegerEntry> entries;
+public class BlockPosition implements Writable, java.io.Serializable {
+  private static final long serialVersionUID = 3717208691381491714L;
+  public int startRow;
+  public int endRow;
+  public int startColumn;
+  public int endColumn;
 
   public BlockPosition() {
-    this(new BlockPositionMapWritable<Text, IntegerEntry>());
-  }
-
-  public BlockPosition(BlockPositionMapWritable<Text, IntegerEntry> entries) {
-    this.entries = entries;
   }
 
-  public int getIndex(String key) {
-    return this.entries.get(new Text(key)).getValue();
-  }
-  
-  public int size() {
-    return this.entries.size();
+  public BlockPosition(byte[] bytes) throws IOException {
+    ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
+    ObjectInputStream oos = new ObjectInputStream(bos);
+    Object obj = null;
+    try {
+      obj = oos.readObject();
+      this.startRow = ((BlockPosition)obj).startRow;
+      this.endRow = ((BlockPosition)obj).endRow;
+      this.startColumn = ((BlockPosition)obj).startColumn;
+      this.endColumn = ((BlockPosition)obj).endColumn;
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    oos.close();
+    bos.close();
   }
   
-  public IntegerEntry put(Text key, IntegerEntry value) {
-    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  public BlockPosition(int startRow, int endRow, int startColumn, int endColumn) {
+    this.startRow = startRow;
+    this.endRow = endRow;
+    this.startColumn = startColumn;
+    this.endColumn = endColumn;
   }
 
-  public IntegerEntry get(Object key) {
-    return this.entries.get(key);
+  public void readFields(DataInput in) throws IOException {
+    this.startRow = in.readInt();
+    this.endRow = in.readInt();
+    this.startColumn = in.readInt();
+    this.endColumn = in.readInt();
   }
 
-  public IntegerEntry remove(Object key) {
-    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(startRow);
+    out.writeInt(endRow);
+    out.writeInt(startColumn);
+    out.writeInt(endColumn);
   }
 
-  public boolean containsKey(Object key) {
-    return entries.containsKey(key);
+  public int getStartRow() {
+    return startRow;
   }
 
-  public boolean containsValue(Object value) {
-    throw new UnsupportedOperationException("Don't support containsValue!");
+  public void setStartRow(int startRow) {
+    this.startRow = startRow;
   }
 
-  public boolean isEmpty() {
-    return entries.isEmpty();
+  public int getEndRow() {
+    return endRow;
   }
 
-  public void clear() {
-    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  public void setEndRow(int endRow) {
+    this.endRow = endRow;
   }
 
-  public Set<Text> keySet() {
-    Set<Text> result = new TreeSet<Text>();
-    for (Text w : entries.keySet()) {
-      result.add(w);
-    }
-    return result;
+  public int getStartColumn() {
+    return startColumn;
   }
 
-  public Set<Map.Entry<Text, IntegerEntry>> entrySet() {
-    return Collections.unmodifiableSet(this.entries.entrySet());
+  public void setStartColumn(int startColumn) {
+    this.startColumn = startColumn;
   }
 
-  public Collection<IntegerEntry> values() {
-    ArrayList<IntegerEntry> result = new ArrayList<IntegerEntry>();
-    for (Writable w : entries.values()) {
-      result.add((IntegerEntry) w);
-    }
-    return result;
+  public int getEndColumn() {
+    return endColumn;
   }
 
-  public void readFields(final DataInput in) throws IOException {
-    this.row = new BlockID(Bytes.readByteArray(in));
-    this.entries.readFields(in);
+  public void setEndColumn(int endColumn) {
+    this.endColumn = endColumn;
   }
 
-  public void write(final DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, this.row.getBytes());
-    this.entries.write(out);
-  }
-
-  public void putAll(Map<? extends Text, ? extends IntegerEntry> m) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  /**
-   * 
-   * The inner class for an entry of row.
-   * 
-   */
-  public static class Entries implements Map.Entry<byte[], IntegerEntry> {
-
-    private final byte[] column;
-    private final IntegerEntry entry;
-
-    Entries(byte[] column, IntegerEntry entry) {
-      this.column = column;
-      this.entry = entry;
-    }
-
-    public IntegerEntry setValue(IntegerEntry c) {
-      throw new UnsupportedOperationException("VectorWritable is read-only!");
-    }
-
-    public byte[] getKey() {
-      byte[] key = column;
-      return key;
-    }
-
-    public IntegerEntry getValue() {
-      return entry;
-    }
+  public byte[] getBytes() throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(bos);
+    oos.writeObject(this);
+    oos.flush();
+    oos.close();
+    bos.close();
+    byte[] data = bos.toByteArray();
+    return data;
   }
 
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Mon Dec 15 19:21:47 2008
@@ -22,134 +22,35 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.SubMatrix;
-import org.apache.hama.util.BytesUtil;
 
-public class BlockWritable implements Writable, Map<Integer, BlockEntry> {
-
-  public Integer row;
-  public BlockMapWritable<Integer, BlockEntry> entries;
+public class BlockWritable implements Writable {
+  public SubMatrix matrix;
 
   public BlockWritable() {
-    this(new BlockMapWritable<Integer, BlockEntry>());
-  }
-
-  public BlockWritable(BlockMapWritable<Integer, BlockEntry> entries) {
-    this.entries = entries;
-  }
-
-  public BlockWritable(int i, int j, SubMatrix mult) throws IOException {
-    this.row = i;
-    BlockMapWritable<Integer, BlockEntry> tr = new BlockMapWritable<Integer, BlockEntry>();
-    tr.put(j, new BlockEntry(mult));
-    this.entries = tr;
-  }
-
-  public int size() {
-    return this.entries.size();
-  }
-  
-  public SubMatrix get(int key) throws IOException {
-    return this.entries.get(key).getValue();
-  }
-  
-  public BlockEntry put(Integer key, BlockEntry value) {
-    throw new UnsupportedOperationException("VectorWritable is read-only!");
-  }
-
-  public BlockEntry get(Object key) {
-    return this.entries.get(key);
-  }
-
-  public BlockEntry remove(Object key) {
-    throw new UnsupportedOperationException("VectorWritable is read-only!");
-  }
-
-  public boolean containsKey(Object key) {
-    return entries.containsKey(key);
+    this.matrix = new SubMatrix(0, 0);
   }
 
-  public boolean containsValue(Object value) {
-    throw new UnsupportedOperationException("Don't support containsValue!");
+  public BlockWritable(SubMatrix c) {
+    this.matrix = c;
   }
 
-  public boolean isEmpty() {
-    return entries.isEmpty();
+  public BlockWritable(byte[] bytes) throws IOException {
+    this.matrix = new SubMatrix(bytes);
   }
 
-  public void clear() {
-    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  public void readFields(DataInput in) throws IOException {
+    this.matrix = new SubMatrix(Bytes.readByteArray(in));
   }
 
-  public Set<Integer> keySet() {
-    Set<Integer> result = new TreeSet<Integer>();
-    for (Integer w : entries.keySet()) {
-      result.add(w);
-    }
-    return result;
+  public void write(DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.matrix.getBytes());
   }
 
-  public Set<Map.Entry<Integer, BlockEntry>> entrySet() {
-    return Collections.unmodifiableSet(this.entries.entrySet());
-  }
-
-  public Collection<BlockEntry> values() {
-    ArrayList<BlockEntry> result = new ArrayList<BlockEntry>();
-    for (Writable w : entries.values()) {
-      result.add((BlockEntry) w);
-    }
-    return result;
-  }
-
-  public void readFields(final DataInput in) throws IOException {
-    this.row = BytesUtil.bytesToInt(Bytes.readByteArray(in));
-    this.entries.readFields(in);
-  }
-
-  public void write(final DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, BytesUtil.intToBytes(this.row));
-    this.entries.write(out);
-  }
-
-  public void putAll(Map<? extends Integer, ? extends BlockEntry> m) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  /**
-   * 
-   * The inner class for an entry of row.
-   * 
-   */
-  public static class Entries implements Map.Entry<byte[], BlockEntry> {
-
-    private final byte[] column;
-    private final BlockEntry entry;
-
-    Entries(byte[] column, BlockEntry entry) {
-      this.column = column;
-      this.entry = entry;
-    }
-
-    public BlockEntry setValue(BlockEntry c) {
-      throw new UnsupportedOperationException("VectorWritable is read-only!");
-    }
-
-    public byte[] getKey() {
-      byte[] key = column;
-      return key;
-    }
-
-    public BlockEntry getValue() {
-      return entry;
-    }
+  public SubMatrix get() {
+    return this.matrix;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Mon Dec 15 19:21:47 2008
@@ -32,11 +32,12 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
 import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
+import org.apache.hama.io.BlockWritable;
 
 public class BlockInputFormat extends TableInputFormatBase implements
-    InputFormat<BlockID, BlockPosition>, JobConfigurable {
+    InputFormat<BlockID, BlockWritable>, JobConfigurable {
   static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
   private TableRecordReader tableRecordReader;
   
@@ -44,7 +45,7 @@
    * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
    */
   protected static class TableRecordReader extends TableRecordReaderBase
-      implements RecordReader<BlockID, BlockPosition> {
+      implements RecordReader<BlockID, BlockWritable> {
 
     /**
      * @return IntWritable
@@ -60,8 +61,8 @@
      * 
      * @see org.apache.hadoop.mapred.RecordReader#createValue()
      */
-    public BlockPosition createValue() {
-      return new BlockPosition();
+    public BlockWritable createValue() {
+      return new BlockWritable();
     }
 
     /**
@@ -73,7 +74,7 @@
      * @return true if there was more data
      * @throws IOException
      */
-    public boolean next(BlockID key, BlockPosition value)
+    public boolean next(BlockID key, BlockWritable value)
         throws IOException {
       RowResult result = this.scanner.next();
       boolean hasMore = result != null && result.size() > 0;
@@ -81,7 +82,8 @@
         byte[] row = result.getRow();
         BlockID bID = new BlockID(row);
         key.set(bID.getRow(), bID.getColumn());
-        Writables.copyWritable(result, value);
+        byte[] rs = result.get(Constants.BLOCK).getValue();
+        Writables.copyWritable(new BlockWritable(rs), value);
       }
       return hasMore;
     }
@@ -94,7 +96,7 @@
    * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
    *      JobConf, Reporter)
    */
-  public RecordReader<BlockID, BlockPosition> getRecordReader(
+  public RecordReader<BlockID, BlockWritable> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter) throws IOException {
     TableSplit tSplit = (TableSplit) split;
     TableRecordReader trr = this.tableRecordReader;

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Mon Dec 15 19:21:47 2008
@@ -20,11 +20,18 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hama.Constants;
@@ -52,7 +59,7 @@
     job.setMapperClass(BlockingMapper.class);
     FileInputFormat.addInputPaths(job, matrixPath);
 
-    job.setInputFormat(BlockInputFormat.class);
+    job.setInputFormat(MyInputFormat.class);
     job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
     job.setOutputFormat(NullOutputFormat.class);
@@ -66,7 +73,7 @@
    */
   public static abstract class BlockingMapRedBase extends MapReduceBase {
     protected DenseMatrix matrix;
-    
+
     @Override
     public void configure(JobConf job) {
       try {
@@ -88,13 +95,101 @@
     public void map(BlockID key, BlockPosition value,
         OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
         throws IOException {
-      int startRow = value.getIndex(Constants.BLOCK_STARTROW);
-      int endRow = value.getIndex(Constants.BLOCK_ENDROW);
-      int startColumn = value.getIndex(Constants.BLOCK_STARTCOLUMN);
-      int endColumn = value.getIndex(Constants.BLOCK_ENDCOLUMN);
+      int startRow = value.getStartRow();
+      int endRow = value.getEndRow();
+      int startColumn = value.getStartColumn();
+      int endColumn = value.getEndColumn();
+
+      matrix.setBlock(key.getRow(), key.getColumn(), matrix.subMatrix(startRow,
+          endRow, startColumn, endColumn));
+    }
+  }
+
+  static class MyInputFormat extends TableInputFormatBase implements
+      InputFormat<BlockID, BlockPosition>, JobConfigurable {
+    static final Log LOG = LogFactory.getLog(MyInputFormat.class);
+    private TableRecordReader tableRecordReader;
+
+    /**
+     * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
+     */
+    protected static class TableRecordReader extends TableRecordReaderBase
+        implements RecordReader<BlockID, BlockPosition> {
+
+      /**
+       * @return IntWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createKey()
+       */
+      public BlockID createKey() {
+        return new BlockID();
+      }
+
+      /**
+       * @return BlockWritable
+       * 
+       * @see org.apache.hadoop.mapred.RecordReader#createValue()
+       */
+      public BlockPosition createValue() {
+        return new BlockPosition();
+      }
+
+      /**
+       * @param key BlockID as input key.
+       * @param value BlockWritable as input value
+       * 
+       * Converts Scanner.next() to BlockID, BlockWritable
+       * 
+       * @return true if there was more data
+       * @throws IOException
+       */
+      public boolean next(BlockID key, BlockPosition value) throws IOException {
+        RowResult result = this.scanner.next();
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+          byte[] row = result.getRow();
+          BlockID bID = new BlockID(row);
+          key.set(bID.getRow(), bID.getColumn());
+          Writables.copyWritable(
+              new BlockPosition(result.get(Constants.BLOCK_POSITION).getValue()), 
+              value);
+        }
+        return hasMore;
+      }
+    }
+
+    /**
+     * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+     * the default.
+     * 
+     * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+     *      JobConf, Reporter)
+     */
+    public RecordReader<BlockID, BlockPosition> getRecordReader(
+        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+      TableSplit tSplit = (TableSplit) split;
+      TableRecordReader trr = this.tableRecordReader;
+      // if no table record reader was provided use default
+      if (trr == null) {
+        trr = new TableRecordReader();
+      }
+      trr.setStartRow(tSplit.getStartRow());
+      trr.setEndRow(tSplit.getEndRow());
+      trr.setHTable(this.table);
+      trr.setInputColumns(this.inputColumns);
+      trr.setRowFilter(this.rowFilter);
+      trr.init();
+      return trr;
+    }
 
-      matrix.setBlock(key.getRow(), key.getColumn(), 
-          matrix.subMatrix(startRow, endRow, startColumn, endColumn));
+    /**
+     * Allows subclasses to set the {@link TableRecordReader}.
+     * 
+     * @param tableRecordReader to provide other {@link TableRecordReader}
+     *                implementations.
+     */
+    protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+      this.tableRecordReader = tableRecordReader;
     }
   }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Dec 15 19:21:47 2008
@@ -28,6 +28,7 @@
 import junit.framework.TestSuite;
 
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.log4j.Logger;
 
@@ -78,20 +79,22 @@
   public void testEntryAdd() throws IOException {
     double origin = m1.get(1, 1);
     m1.add(1, 1, 0.5);
-    
+
     assertEquals(m1.get(1, 1), origin + 0.5);
   }
-  
+
   public void testBlocking() throws IOException, ClassNotFoundException {
     assertEquals(((DenseMatrix) m1).isBlocked(), false);
     ((DenseMatrix) m1).blocking(4);
     assertEquals(((DenseMatrix) m1).isBlocked(), true);
-    int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
-    double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();
+    BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
+    double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(),
+        pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
+        .getDoubleArray();
     double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray();
     assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
     assertEquals(c.length, 5);
-    
+
     for (int i = 0; i < b.length; i++) {
       for (int j = 0; j < b.length; j++) {
         assertEquals(b[i][j], c[i][j]);
@@ -109,19 +112,21 @@
     assertEquals(((DenseMatrix) m2).isBlocked(), false);
     ((DenseMatrix) m2).blocking_mapred(4);
     assertEquals(((DenseMatrix) m2).isBlocked(), true);
-    int[] pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
-    double[][] b = ((DenseMatrix) m2).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();
+    BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
+    double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(),
+        pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
+        .getDoubleArray();
     double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray();
     assertEquals(((DenseMatrix) m2).getBlockSize(), 2);
     assertEquals(c.length, 5);
-    
+
     for (int i = 0; i < b.length; i++) {
       for (int j = 0; j < b.length; j++) {
         assertEquals(b[i][j], c[i][j]);
       }
     }
   }
-  
+
   /**
    * Column vector test.
    * 
@@ -137,7 +142,7 @@
       x++;
     }
   }
-  
+
   public void testGetSetAttribute() throws IOException {
     m1.setRowLabel(0, "row1");
     assertEquals(m1.getRowLabel(0), "row1");
@@ -228,11 +233,11 @@
   public void testSetColumn() throws IOException {
     Vector v = new DenseVector();
     double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-    
+
     for (int i = 0; i < SIZE; i++) {
       v.set(i, entries[i]);
     }
-    
+
     m1.setColumn(SIZE + 1, v);
     Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
 
@@ -242,7 +247,7 @@
       i++;
     }
   }
-  
+
   public void testLoadSave() throws IOException {
     String path1 = m1.getPath();
     // save m1 to aliase1
@@ -344,8 +349,8 @@
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), 
-            String.valueOf(C[i][j]).substring(0, 14));
+        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
+            .valueOf(C[i][j]).substring(0, 14));
       }
     }
   }

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Mon Dec 15 19:21:47 2008
@@ -21,7 +21,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hama.DenseMatrix;
@@ -29,6 +28,7 @@
 import org.apache.hama.Matrix;
 import org.apache.hama.algebra.BlockCyclicMultiplyMap;
 import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 import org.apache.log4j.Logger;
 
@@ -36,19 +36,21 @@
   static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
   static Matrix c;
   static final int SIZE = 20;
+
   /** constructor */
   public TestBlockMatrixMapReduce() {
     super();
   }
 
-  public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
+  public void testBlockMatrixMapReduce() throws IOException,
+      ClassNotFoundException {
     Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
     Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
     ((DenseMatrix) m1).blocking_mapred(4);
     ((DenseMatrix) m2).blocking_mapred(4);
 
     miniMRJob(m1.getPath(), m2.getPath());
-    
+
     double[][] C = new double[SIZE][SIZE];
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
@@ -60,8 +62,8 @@
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(C[i][j]).substring(0, 5), 
-            String.valueOf(c.get(i, j)).substring(0, 5));
+        assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf(
+            c.get(i, j)).substring(0, 5));
       }
     }
   }
@@ -69,13 +71,15 @@
   private void miniMRJob(String string, String string2) throws IOException {
     c = new DenseMatrix(conf);
     String output = c.getPath();
-    
+
     JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
     jobConf.setJobName("test MR job");
 
-    BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
-        BlockWritable.class, jobConf);
-    BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
+    BlockCyclicMultiplyMap.initJob(string, string2,
+        BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
+        jobConf);
+    BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class,
+        jobConf);
 
     jobConf.setNumMapTasks(2);
     jobConf.setNumReduceTasks(2);