You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/16 07:47:18 UTC

svn commit: r734925 - /incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java

Author: edwardyoon
Date: Thu Jan 15 22:47:18 2009
New Revision: 734925

URL: http://svn.apache.org/viewvc?rev=734925&view=rev
Log:
Add comment.

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=734925&r1=734924&r2=734925&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Thu Jan 15 22:47:18 2009
@@ -52,23 +52,23 @@
    * Initialize a job to blocking a table
    * 
    * @param matrixPath
-   * @param collectionTable 
-   * @param block_size 
-   * @param j 
-   * @param i 
+   * @param collectionTable
+   * @param block_size
+   * @param j
+   * @param i
    * @param job
    */
-  public static void initJob(String matrixPath, String collectionTable, boolean bool
-      ,int block_size, int i, int j, JobConf job) {
+  public static void initJob(String matrixPath, String collectionTable,
+      boolean bool, int block_size, int i, int j, JobConf job) {
     job.setMapperClass(BlockingMapper.class);
     job.setReducerClass(BlockingReducer.class);
     FileInputFormat.addInputPaths(job, matrixPath);
 
     job.setInputFormat(VectorInputFormat.class);
-    
+
     job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
-    
+
     job.setOutputFormat(BlockOutputFormat.class);
     job.setOutputKeyClass(BlockID.class);
     job.setOutputValueClass(BlockWritable.class);
@@ -77,7 +77,7 @@
     job.set(COLUMNS, String.valueOf(j));
     job.setBoolean(MATRIX_POS, bool);
     job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
-    if(bool)
+    if (bool)
       job.set(BlockOutputFormat.COLUMN, "a");
     else
       job.set(BlockOutputFormat.COLUMN, "b");
@@ -91,22 +91,22 @@
     protected int mBlockNum;
     protected int mBlockRowSize;
     protected int mBlockColSize;
-    
+
     protected int mRows;
     protected int mColumns;
-    
+
     protected boolean matrixPos;
 
     @Override
     public void configure(JobConf job) {
-        mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
-        mRows = Integer.parseInt(job.get(ROWS, ""));
-        mColumns = Integer.parseInt(job.get(COLUMNS, ""));
-        
-        mBlockRowSize = mRows / mBlockNum;
-        mBlockColSize = mColumns / mBlockNum;
-        
-        matrixPos = job.getBoolean(MATRIX_POS, true);
+      mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+      mRows = Integer.parseInt(job.get(ROWS, ""));
+      mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+
+      mBlockRowSize = mRows / mBlockNum;
+      mBlockColSize = mColumns / mBlockNum;
+
+      matrixPos = job.getBoolean(MATRIX_POS, true);
     }
   }
 
@@ -124,23 +124,25 @@
       int endColumn;
       int blkRow = key.get() / mBlockRowSize;
       DenseVector dv = value.getDenseVector();
-      
+
       int i = 0;
       do {
         startColumn = i * mBlockColSize;
         endColumn = startColumn + mBlockColSize - 1;
-        if(endColumn >= mColumns) // the last sub vector
+        if (endColumn >= mColumns) // the last sub vector
           endColumn = mColumns - 1;
-        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
-            dv.subVector(startColumn, endColumn)));
-        
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), dv
+            .subVector(startColumn, endColumn)));
+
         i++;
-      } while(endColumn < (mColumns-1));
+      } while (endColumn < (mColumns - 1));
     }
   }
 
   /**
-   * Reducer Class
+   * Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k)
+   * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and
+   * b(k, j).
    */
   public static class BlockingReducer extends BlockingMapRedBase implements
       Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
@@ -149,23 +151,24 @@
     public void reduce(BlockID key, Iterator<VectorWritable> values,
         OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
         throws IOException {
-      // Note: all the sub-vectors are grouped by {@link org.apache.hama.io.BlockID}
-      
+      // Note: all the sub-vectors are grouped by {@link
+      // org.apache.hama.io.BlockID}
+
       // the block's base offset in the original matrix
       int colBase = key.getColumn() * mBlockColSize;
       int rowBase = key.getRow() * mBlockRowSize;
-      
+
       // the block's size : rows & columns
       int smRows = mBlockRowSize;
-      if((rowBase + mBlockRowSize - 1) >= mRows)
+      if ((rowBase + mBlockRowSize - 1) >= mRows)
         smRows = mRows - rowBase;
       int smCols = mBlockColSize;
-      if((colBase + mBlockColSize - 1) >= mColumns)  
+      if ((colBase + mBlockColSize - 1) >= mColumns)
         smCols = mColumns - colBase;
-      
+
       // construct the matrix
       SubMatrix subMatrix = new SubMatrix(smRows, smCols);
-      
+
       // i, j is the current offset in the sub-matrix
       int i = 0, j = 0;
       while (values.hasNext()) {
@@ -185,7 +188,7 @@
       BlockWritable outValue = new BlockWritable(subMatrix);
 
       // It'll used for only matrix multiplication.
-      if(matrixPos) {
+      if (matrixPos) {
         for (int x = 0; x < mBlockNum; x++) {
           int r = (key.getRow() * mBlockNum) * mBlockNum;
           int seq = (x * mBlockNum) + key.getColumn() + r;
@@ -193,11 +196,11 @@
         }
       } else {
         for (int x = 0; x < mBlockNum; x++) {
-          int seq = (x * mBlockNum * mBlockNum) + 
-                          (key.getColumn() * mBlockNum) + key.getRow();
+          int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+              + key.getRow();
           output.collect(new BlockID(x, key.getColumn(), seq), outValue);
         }
       }
     }
   }
-}
\ No newline at end of file
+}