You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/22 06:01:01 UTC

svn commit: r728611 - /incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java

Author: edwardyoon
Date: Sun Dec 21 21:01:00 2008
New Revision: 728611

URL: http://svn.apache.org/viewvc?rev=728611&view=rev
Log:
Revert changes from revision 725570

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=728611&r1=728610&r2=728611&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Sun Dec 21 21:01:00 2008
@@ -17,34 +17,32 @@
 package org.apache.hama.mapred;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableSplit;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hama.Constants;
 import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SubMatrix;
 import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.VectorWritable;
 
 /**
  * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
  */
 public class BlockingMapRed {
+
   static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
   /** Parameter of the path of the matrix to be blocked * */
   public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
@@ -57,139 +55,126 @@
    */
   public static void initJob(String matrixPath, JobConf job) {
     job.setMapperClass(BlockingMapper.class);
+    job.setReducerClass(BlockingReducer.class);
     FileInputFormat.addInputPaths(job, matrixPath);
 
-    job.setInputFormat(MyInputFormat.class);
+    job.setInputFormat(VectorInputFormat.class);
     job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
     job.setOutputFormat(NullOutputFormat.class);
 
     job.set(BLOCKING_MATRIX, matrixPath);
-    job.set(VectorInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
+    job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
   }
 
   /**
    * Abstract Blocking Map/Reduce Class to configure the job.
    */
   public static abstract class BlockingMapRedBase extends MapReduceBase {
+
     protected DenseMatrix matrix;
+    protected int mBlockNum;
+    protected int mBlockRowSize;
+    protected int mBlockColSize;
+    
+    protected int mRows;
+    protected int mColumns;
 
     @Override
     public void configure(JobConf job) {
       try {
         matrix = new DenseMatrix(new HamaConfiguration(), job.get(
             BLOCKING_MATRIX, ""));
+        mBlockNum = matrix.getBlockSize();
+        mBlockRowSize = matrix.getRows() / mBlockNum;
+        mBlockColSize = matrix.getColumns() / mBlockNum;
+        
+        mRows = matrix.getRows();
+        mColumns = matrix.getColumns();
       } catch (IOException e) {
         LOG.warn("Load matrix_blocking failed : " + e.getMessage());
       }
     }
+
   }
 
   /**
    * Mapper Class
    */
   public static class BlockingMapper extends BlockingMapRedBase implements
-      Mapper<BlockID, BlockPosition, BlockID, VectorWritable> {
+      Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
 
     @Override
-    public void map(BlockID key, BlockPosition value,
+    public void map(IntWritable key, VectorWritable value,
         OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
         throws IOException {
-      int startRow = value.getStartRow();
-      int endRow = value.getEndRow();
-      int startColumn = value.getStartColumn();
-      int endColumn = value.getEndColumn();
-
-      matrix.setBlock(key.getRow(), key.getColumn(), matrix.subMatrix(startRow,
-          endRow, startColumn, endColumn));
+      int startColumn;
+      int endColumn;
+      int blkRow = key.get() / mBlockRowSize;
+      DenseVector dv = value.getDenseVector();
+      
+      int i = 0;
+      do {
+        startColumn = i * mBlockColSize;
+        endColumn = startColumn + mBlockColSize - 1;
+        if(endColumn >= mColumns) // the last sub vector
+          endColumn = mColumns - 1;
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+            dv.subVector(startColumn, endColumn)));
+        
+        i++;
+      } while(endColumn < (mColumns-1));
     }
-  }
 
-  static class MyInputFormat extends TableInputFormatBase implements
-      InputFormat<BlockID, BlockPosition>, JobConfigurable {
-    static final Log LOG = LogFactory.getLog(MyInputFormat.class);
-    private TableRecordReader tableRecordReader;
-
-    /**
-     * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
-     */
-    protected static class TableRecordReader extends TableRecordReaderBase
-        implements RecordReader<BlockID, BlockPosition> {
-
-      /**
-       * @return IntWritable
-       * 
-       * @see org.apache.hadoop.mapred.RecordReader#createKey()
-       */
-      public BlockID createKey() {
-        return new BlockID();
-      }
+  }
 
-      /**
-       * @return BlockWritable
-       * 
-       * @see org.apache.hadoop.mapred.RecordReader#createValue()
-       */
-      public BlockPosition createValue() {
-        return new BlockPosition();
-      }
+  /**
+   * Reducer Class
+   */
+  public static class BlockingReducer extends BlockingMapRedBase implements
+      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
 
-      /**
-       * @param key BlockID as input key.
-       * @param value BlockWritable as input value
-       * 
-       * Converts Scanner.next() to BlockID, BlockWritable
-       * 
-       * @return true if there was more data
-       * @throws IOException
-       */
-      public boolean next(BlockID key, BlockPosition value) throws IOException {
-        RowResult result = this.scanner.next();
-        boolean hasMore = result != null && result.size() > 0;
-        if (hasMore) {
-          byte[] row = result.getRow();
-          BlockID bID = new BlockID(row);
-          key.set(bID.getRow(), bID.getColumn());
-          Writables.copyWritable(
-              new BlockPosition(result.get(Constants.BLOCK_POSITION).getValue()), 
-              value);
+    @Override
+    public void reduce(BlockID key, Iterator<VectorWritable> values,
+        OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+        throws IOException {
+      // Note: all the sub-vectors are grouped by {@link
+      // org.apache.hama.io.BlockID}
+      
+      // the block's base offset in the original matrix
+      int colBase = key.getColumn() * mBlockColSize;
+      int rowBase = key.getRow() * mBlockRowSize;
+      
+      // the block's size : rows & columns
+      int smRows = mBlockRowSize;
+      if((rowBase + mBlockRowSize - 1) >= mRows)
+        smRows = mRows - rowBase;
+      int smCols = mBlockColSize;
+      if((colBase + mBlockColSize - 1) >= mColumns)  
+        smCols = mColumns - colBase;
+      
+      // construct the matrix
+      SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+      
+      // i, j is the current offset in the sub-matrix
+      int i = 0, j = 0;
+      while (values.hasNext()) {
+        VectorWritable vw = values.next();
+        // check the size is suitable
+        if (vw.size() != smCols)
+          throw new IOException("Block Column Size dismatched.");
+        i = vw.row - rowBase;
+        if (i >= smRows || i < 0)
+          throw new IOException("Block Row Size dismatched.");
+
+        // put the subVector to the subMatrix
+        for (j = 0; j < smCols; j++) {
+          subMatrix.set(i, j, vw.get(colBase + j));
         }
-        return hasMore;
-      }
-    }
-
-    /**
-     * Builds a TableRecordReader. If no TableRecordReader was provided, uses
-     * the default.
-     * 
-     * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
-     *      JobConf, Reporter)
-     */
-    public RecordReader<BlockID, BlockPosition> getRecordReader(
-        InputSplit split, JobConf job, Reporter reporter) throws IOException {
-      TableSplit tSplit = (TableSplit) split;
-      TableRecordReader trr = this.tableRecordReader;
-      // if no table record reader was provided use default
-      if (trr == null) {
-        trr = new TableRecordReader();
       }
-      trr.setStartRow(tSplit.getStartRow());
-      trr.setEndRow(tSplit.getEndRow());
-      trr.setHTable(this.table);
-      trr.setInputColumns(this.inputColumns);
-      trr.setRowFilter(this.rowFilter);
-      trr.init();
-      return trr;
-    }
 
-    /**
-     * Allows subclasses to set the {@link TableRecordReader}.
-     * 
-     * @param tableRecordReader to provide other {@link TableRecordReader}
-     *                implementations.
-     */
-    protected void setTableRecordReader(TableRecordReader tableRecordReader) {
-      this.tableRecordReader = tableRecordReader;
+      matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
     }
   }
+
 }