You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/05 01:30:06 UTC

svn commit: r731394 - in /incubator/hama/trunk/src/java/org/apache/hama: Constants.java DenseMatrix.java algebra/BlockCyclicMultiplyMap.java

Author: edwardyoon
Date: Sun Jan  4 16:30:05 2009
New Revision: 731394

URL: http://svn.apache.org/viewvc?rev=731394&view=rev
Log:
Bug of scanner range in block multiplication

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Jan  4 16:30:05 2009
@@ -91,4 +91,6 @@
   public static final String BLOCK = "block:";
 
   public static final String BLOCK_PATH = "attribute:blockPath";
+
+  public static final String BLOCK_SIZE = "attribute:blockSize";
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Sun Jan  4 16:30:05 2009
@@ -395,7 +395,7 @@
 
     if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
       BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), 
-          ((DenseMatrix) B).getBlockedMatrixPath(),
+          ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(),
           BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
           jobConf);
       BlockCyclicMultiplyReduce.initJob(result.getPath(),
@@ -506,7 +506,7 @@
     int block_size = (int) blocks;
     Matrix blockedMatrix = new DenseMatrix(config);
     blockedMatrix.setDimension(block_size, block_size);
-    this.setBlockedMatrixPath(blockedMatrix.getPath());
+    this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size);
     
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("Blocking MR job" + getPath());
@@ -524,9 +524,15 @@
         Constants.BLOCK_PATH).getValue());
   }
 
-  protected void setBlockedMatrixPath(String path) throws IOException {
+  protected void setBlockedMatrixPath(String path, int size) throws IOException {
     BatchUpdate update = new BatchUpdate(Constants.METADATA);
     update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
+    update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size));
     table.commit(update);
   }
+  
+  public int getBlockedMatrixSize() throws IOException {
+    return Bytes.toInt(table.get(Constants.METADATA,
+        Constants.BLOCK_SIZE).getValue());
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Sun Jan  4 16:30:05 2009
@@ -32,8 +32,6 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.HamaConfiguration;
 import org.apache.hama.SubMatrix;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
@@ -43,25 +41,29 @@
 public class BlockCyclicMultiplyMap extends MapReduceBase implements
     Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
   static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
-  protected DenseMatrix matrix_b;
+  protected HTable table;
+  protected int block_size;
   public static final String MATRIX_B = "hama.multiplication.matrix.b";
+  public static final String BLOCK_SIZE = "hama.multiplication.block.size";
 
   public void configure(JobConf job) {
     try {
-      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+      table = new HTable(job.get(MATRIX_B, ""));
+      block_size = Integer.parseInt(job.get(BLOCK_SIZE, ""));
     } catch (IOException e) {
       LOG.warn("Load matrix_b failed : " + e.getMessage());
     }
   }
 
   public static void initJob(String matrix_a, String matrix_b,
-      Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
+      int block_size, Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
       Class<BlockWritable> outputValueClass, JobConf jobConf) {
 
     jobConf.setMapOutputValueClass(outputValueClass);
     jobConf.setMapOutputKeyClass(outputKeyClass);
     jobConf.setMapperClass(map);
     jobConf.set(MATRIX_B, matrix_b);
+    jobConf.set(BLOCK_SIZE, String.valueOf(block_size));
 
     jobConf.setInputFormat(BlockInputFormat.class);
     FileInputFormat.addInputPaths(jobConf, matrix_a);
@@ -73,25 +75,36 @@
   public void map(BlockID key, BlockWritable value,
       OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
       throws IOException {
-    // we don't need to get blockSize each time
-    int blockSize = matrix_b.getRows();
     SubMatrix a = value.get();
-    HTable table = matrix_b.getHTable();
 
-    // startKey : new BlockID(key.getColumn(), 0).toString()
-    // endKey : new BlockID(key.getColumn(), blockSize+1).toString()
-    Scanner scan = table.getScanner(new byte[][] { Bytes
-        .toBytes(Constants.BLOCK) },
-        new BlockID(key.getColumn(), 0).getBytes(), new BlockID(
-            key.getColumn(), blockSize + 1).getBytes());
+    BlockID startBlock = new BlockID(key.getColumn(), 0);
+    BlockID endBlock = new BlockID(key.getColumn() + 1, 0);
+    Scanner scan;
+    if ((key.getColumn() + 1) == block_size) {
+      scan = table.getScanner(new byte[][] { Bytes
+          .toBytes(Constants.BLOCK) }, startBlock.getBytes());
+    } else {
+      scan = table.getScanner(new byte[][] { Bytes
+          .toBytes(Constants.BLOCK) }, startBlock.getBytes(), endBlock
+          .getBytes());
+    }
 
+    int blocks = 0;
     for (RowResult row : scan) {
       BlockID bid = new BlockID(row.getRow());
       SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
       SubMatrix c = a.mult(b);
       output.collect(new BlockID(key.getRow(), bid.getColumn()),
           new BlockWritable(c));
+      blocks++;
     }
+
+    if (blocks == 0)
+      throw new IOException("There is no matrix b." +
+          "\ntableName: " + new String(table.getTableName())
+          + "\nscanner startKey: " + startBlock.toString() + ", endKey: "
+          + endBlock.toString());
+
     scan.close();
   }
 }