You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/02 01:57:32 UTC

svn commit: r722319 - in /incubator/hama/trunk/src: java/org/apache/hama/io/BlockID.java java/org/apache/hama/mapred/BlockingMapRed.java test/org/apache/hama/TestBlockRowId.java test/org/apache/hama/TestDenseMatrix.java

Author: edwardyoon
Date: Mon Dec  1 16:57:31 2008
New Revision: 722319

URL: http://svn.apache.org/viewvc?rev=722319&view=rev
Log:
Fix blocking mapred bug

Removed:
    incubator/hama/trunk/src/test/org/apache/hama/TestBlockRowId.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Mon Dec  1 16:57:31 2008
@@ -93,6 +93,7 @@
   /**
    * BlockID Comparator 
    */
+  /* why we need a special Comparator
   public static class Comparator extends WritableComparator {
     protected Comparator() {
       super(BlockID.class);
@@ -101,6 +102,8 @@
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       int thisRow = readInt(b1, s1);
       int thatRow = readInt(b2, s2);
+      // why read from l1 & l2
+      // l1 means length 1, and l2 means length 2
       int thisColumn = readInt(b1, l1);
       int thatColumn = readInt(b2, l2);
 
@@ -116,4 +119,5 @@
   static { // register this comparator
     WritableComparator.define(BlockID.class, new Comparator());
   }
+  */
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Mon Dec  1 16:57:31 2008
@@ -16,17 +16,12 @@
  */
 package org.apache.hama.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -45,14 +40,14 @@
 
 /**
  * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
- **/
+ */
 public class BlockingMapRed {
 
-  static final Log LOG = LogFactory.getLog(BlockingMapRed.class); 
-  /** Parameter of the path of the matrix to be blocked **/
+  static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
+  /** Parameter of the path of the matrix to be blocked * */
   public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
-  
-  /** 
+
+  /**
    * Initialize a job to blocking a table
    * 
    * @param matrixPath
@@ -62,31 +57,31 @@
     job.setMapperClass(BlockingMapper.class);
     job.setReducerClass(BlockingReducer.class);
     FileInputFormat.addInputPaths(job, matrixPath);
-    
+
     job.setInputFormat(VectorInputFormat.class);
-    job.setMapOutputKeyClass(BlockRowId.class);
+    job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
     job.setOutputFormat(NullOutputFormat.class);
-    job.setOutputValueGroupingComparator(GroupingComparator.class);
-    
+
     job.set(BLOCKING_MATRIX, matrixPath);
     job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
   }
-  
+
   /**
    * Abstract Blocking Map/Reduce Class to configure the job.
    */
   public static abstract class BlockingMapRedBase extends MapReduceBase {
-    
+
     protected DenseMatrix matrix;
     protected int mBlockNum;
     protected int mBlockRowSize;
     protected int mBlockColSize;
-    
+
     @Override
     public void configure(JobConf job) {
       try {
-        matrix = new DenseMatrix(new HamaConfiguration(), job.get(BLOCKING_MATRIX, ""));
+        matrix = new DenseMatrix(new HamaConfiguration(), job.get(
+            BLOCKING_MATRIX, ""));
         mBlockNum = matrix.getBlockSize();
         mBlockRowSize = matrix.getRows() / mBlockNum;
         mBlockColSize = matrix.getColumns() / mBlockNum;
@@ -94,177 +89,66 @@
         LOG.warn("Load matrix_blocking failed : " + e.getMessage());
       }
     }
-    
+
   }
-  
+
   /**
    * Mapper Class
    */
   public static class BlockingMapper extends BlockingMapRedBase implements
-      Mapper<IntWritable, VectorWritable, BlockRowId, VectorWritable> {
+      Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
 
     @Override
     public void map(IntWritable key, VectorWritable value,
-        OutputCollector<BlockRowId, VectorWritable> output, Reporter reporter)
+        OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
         throws IOException {
       int startColumn;
       int endColumn;
       int blkRow = key.get() / mBlockRowSize;
       DenseVector dv = value.getDenseVector();
-      for(int i = 0 ; i < mBlockNum; i++) {
+      for (int i = 0; i < mBlockNum; i++) {
         startColumn = i * mBlockColSize;
         endColumn = startColumn + mBlockColSize - 1;
-        output.collect(new BlockRowId(new BlockID(blkRow, i), key.get()), 
-            new VectorWritable(key.get(), (DenseVector) dv.subVector(startColumn, endColumn)));
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+            (DenseVector) dv.subVector(startColumn, endColumn)));
       }
     }
 
   }
-  
+
   /**
    * Reducer Class
    */
   public static class BlockingReducer extends BlockingMapRedBase implements
-      Reducer<BlockRowId, VectorWritable, BlockID, SubMatrix> {
+      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
 
     @Override
-    public void reduce(BlockRowId key, Iterator<VectorWritable> values,
+    public void reduce(BlockID key, Iterator<VectorWritable> values,
         OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
         throws IOException {
-      BlockID blkId = key.getBlockId();
-      final SubMatrix subMatrix = new SubMatrix(mBlockRowSize, mBlockColSize);
-      int i=0, j=0;
-      int colBase = blkId.getColumn() * mBlockColSize;
-      while(values.hasNext()) {
-        if(i > mBlockRowSize) 
-          throw new IOException("BlockRowSize dismatched.");
-        
+      // Note: all the sub-vectors are grouped by {@link
+      // org.apache.hama.io.BlockID}
+      SubMatrix subMatrix = new SubMatrix(mBlockRowSize, mBlockColSize);
+      int i = 0, j = 0;
+      int colBase = key.getColumn() * mBlockColSize;
+      int rowBase = key.getRow() * mBlockRowSize;
+      while (values.hasNext()) {
         VectorWritable vw = values.next();
-        if(vw.size() != mBlockColSize) 
+        // check the size is suitable
+        if (vw.size() != mBlockColSize)
           throw new IOException("BlockColumnSize dismatched.");
-        for(j=0; j<mBlockColSize; j++) {
+        i = vw.row - rowBase;
+        if (i >= mBlockRowSize || i < 0)
+          throw new IOException("BlockRowSize dismatched.");
+
+        // put the subVector to the subMatrix
+        for (j = 0; j < mBlockColSize; j++) {
           subMatrix.set(i, j, vw.get(colBase + j));
         }
-        i++;
       }
-      
-      matrix.setBlock(blkId.getRow(), blkId.getColumn(), subMatrix);
-    }
-  }
-  
-  /**
-   * A Grouping Comparator to group all the {@link BlockRowId} with 
-   * same {@link org.apache.hama.io.BlockID}
-   */
-  public static class GroupingComparator implements RawComparator<BlockRowId> {
 
-    private final DataInputBuffer buffer = new DataInputBuffer();
-    private final BlockRowId key1 = new BlockRowId();
-    private final BlockRowId key2 = new BlockRowId();
-    
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
-        int l2) {
-      try {
-        buffer.reset(b1, s1, l1);       // parse k1
-        key1.readFields(buffer);
-        
-        buffer.reset(b2, s2, l2);
-        key2.readFields(buffer);
-        
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      
-      return compare(key1, key2);
+      matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
     }
-
-    @Override
-    public int compare(BlockRowId first, BlockRowId second) {
-      return first.mBlockId.compareTo(second.mBlockId);
-    }
-    
   }
-  
-  /**
-   * Help Class to store <blockid, rowid> pair.
-   */
-  public static class BlockRowId implements WritableComparable<BlockRowId> {
-
-    BlockID mBlockId;
-    int mRowId;
-    
-    /**
-     * Empty Constructor used for serialization.
-     */
-    public BlockRowId() { }
-    
-    /**
-     * Construct a block-row id using blockid & rowid
-     * @param blockid
-     * @param rowid
-     */
-    public BlockRowId(BlockID blockid, int rowid) {
-      mBlockId = blockid;
-      mRowId = rowid;
-    }
-    
-    /** 
-     * Get the block ID
-     * @return BlockID
-     */
-    public BlockID getBlockId() { return mBlockId; }
-    
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      mBlockId = new BlockID();
-      mBlockId.readFields(in);
-      mRowId = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      mBlockId.write(out);
-      out.writeInt(mRowId);
-    }
 
-    // first compare block ids
-    // when blockids are same, then compare row ids
-    @Override
-    public int compareTo(BlockRowId another) {
-      int cmp = mBlockId.compareTo(another.mBlockId);
-      if(cmp == 0) {
-        cmp = mRowId - another.mRowId;
-      }
-      return cmp;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if(obj == null) return false;
-      if(!(obj instanceof BlockRowId)) return false;
-      
-      BlockRowId another = (BlockRowId)obj;
-      return compareTo(another) == 0;
-    }
-
-    // tricky here
-    // we just used the block id to generate the hashcode
-    // so that the same block will be in the same reducer
-    @Override
-    public int hashCode() {
-      return mBlockId.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("<");
-      sb.append(mBlockId);
-      sb.append(",");
-      sb.append(mRowId);
-      sb.append(">");
-      return sb.toString();
-    }
-  }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Dec  1 16:57:31 2008
@@ -84,7 +84,7 @@
   
   public void testBlocking() throws IOException, ClassNotFoundException {
     assertEquals(((DenseMatrix) m1).isBlocked(), false);
-    ((DenseMatrix) m1).blocking_mapred(2);
+    ((DenseMatrix) m1).blocking(2);
     assertEquals(((DenseMatrix) m1).isBlocked(), true);
     int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
     double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();