You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/07 09:29:49 UTC

svn commit: r732267 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/ src/test/org/apache/hama/mapred/

Author: edwardyoon
Date: Wed Jan  7 00:29:49 2009
New Revision: 732267

URL: http://svn.apache.org/viewvc?rev=732267&view=rev
Log:
Fix OOME.

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan  7 00:29:49 2009
@@ -33,6 +33,7 @@
     
   IMPROVEMENTS
     
+    HAMA-142: Trunk doesn't work for large matrices (edwardyoon)
     HAMA-143: Improve of random_mapred() (edwardyoon)
     HAMA-138: To order, left pad with zeroes to integer key (edwardyoon)
     HAMA-135, HAMA-137: Refactor mapred, I/O package (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Wed Jan  7 00:29:49 2009
@@ -22,9 +22,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -42,33 +39,30 @@
     Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
   static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
   protected HTable table;
-  protected int block_size;
   public static final String MATRIX_B = "hama.multiplication.matrix.b";
-  public static final String BLOCK_SIZE = "hama.multiplication.block.size";
 
   public void configure(JobConf job) {
     try {
       table = new HTable(job.get(MATRIX_B, ""));
-      block_size = Integer.parseInt(job.get(BLOCK_SIZE, ""));
     } catch (IOException e) {
       LOG.warn("Load matrix_b failed : " + e.getMessage());
     }
   }
 
-  public static void initJob(String matrix_a, String matrix_b,
-      int block_size, Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
+  public static void initJob(String matrix_a, String matrix_b, int block_size,
+      Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
       Class<BlockWritable> outputValueClass, JobConf jobConf) {
 
     jobConf.setMapOutputValueClass(outputValueClass);
     jobConf.setMapOutputKeyClass(outputKeyClass);
     jobConf.setMapperClass(map);
     jobConf.set(MATRIX_B, matrix_b);
-    jobConf.set(BLOCK_SIZE, String.valueOf(block_size));
 
     jobConf.setInputFormat(BlockInputFormat.class);
     FileInputFormat.addInputPaths(jobConf, matrix_a);
 
     jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+    jobConf.set(BlockInputFormat.REPEAT_NUM, String.valueOf(block_size));
   }
 
   @Override
@@ -76,35 +70,12 @@
       OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
       throws IOException {
     SubMatrix a = value.get();
-
-    BlockID startBlock = new BlockID(key.getColumn(), 0);
-    BlockID endBlock = new BlockID(key.getColumn() + 1, 0);
-    Scanner scan;
-    if ((key.getColumn() + 1) == block_size) {
-      scan = table.getScanner(new byte[][] { Bytes
-          .toBytes(Constants.BLOCK) }, startBlock.getBytes());
-    } else {
-      scan = table.getScanner(new byte[][] { Bytes
-          .toBytes(Constants.BLOCK) }, startBlock.getBytes(), endBlock
-          .getBytes());
-    }
-
-    int blocks = 0;
-    for (RowResult row : scan) {
-      BlockID bid = new BlockID(row.getRow());
-      SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
-      SubMatrix c = a.mult(b);
-      output.collect(new BlockID(key.getRow(), bid.getColumn()),
-          new BlockWritable(c));
-      blocks++;
-    }
-
-    if (blocks == 0)
-      throw new IOException("There is no matrix b." +
-          "\ntableName: " + new String(table.getTableName())
-          + "\nscanner startKey: " + startBlock.toString() + ", endKey: "
-          + endBlock.toString());
-
-    scan.close();
+    SubMatrix b = new SubMatrix(table.get(
+        new BlockID(key.getColumn(), BlockInputFormat.getRepeatCount())
+            .toString(), Constants.BLOCK).getValue());
+    SubMatrix c = a.mult(b);
+    output.collect(
+        new BlockID(key.getRow(), BlockInputFormat.getRepeatCount()),
+        new BlockWritable(c));
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Wed Jan  7 00:29:49 2009
@@ -43,12 +43,21 @@
   static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
   private TableRecordReader tableRecordReader;
   
+  public static int getRepeatCount() {
+    return TableRecordReader.repeatCount;
+  }
+  
+  public static int getRepeat() {
+    return repeat;
+  }
+  
   /**
    * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
    */
   protected static class TableRecordReader extends TableRecordReaderBase
       implements RecordReader<BlockID, BlockWritable> {
-
+    private static int repeatCount = 0;
+    
     /**
      * @return IntWritable
      * 
@@ -89,6 +98,15 @@
       }
       
       boolean hasMore = result != null && result.size() > 0;
+      
+      // Scanner will be restarted.
+      if(!hasMore && repeatCount < repeat - 1) {
+        this.init();
+        repeatCount++;
+        result = this.scanner.next();
+        hasMore = result != null && result.size() > 0;
+      }
+      
       if (hasMore) {
         byte[] row = result.getRow();
         BlockID bID = new BlockID(row);

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Wed Jan  7 00:29:49 2009
@@ -40,14 +40,19 @@
   protected byte[][] inputColumns;
   protected HTable table;
   protected RowFilterInterface rowFilter;
-
+  protected static int repeat;
+  
   /**
    * space delimited list of columns
    */
   public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+  public static final String REPEAT_NUM = "hama.mapred.repeat";
   
   public void configure(JobConf job) {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if(job.get(REPEAT_NUM) != null) {
+      setRepeat(Integer.parseInt(job.get(REPEAT_NUM)));
+    }
     String colArg = job.get(COLUMN_LIST);
     String[] colNames = colArg.split(" ");
     byte[][] m_cols = new byte[colNames.length][];
@@ -62,6 +67,10 @@
     }
   }
 
+  private void setRepeat(int parseInt) {
+    repeat =  parseInt;
+  }
+
   public void validateInput(JobConf job) throws IOException {
     // expecting exactly one path
     Path[] tableNames = FileInputFormat.getInputPaths(job);
@@ -94,7 +103,11 @@
    * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
    */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    byte [][] startKeys = this.table.getStartKeys();
+    byte [][] startKeys = null;
+    try {
+      startKeys = this.table.getStartKeys();
+    } catch (NullPointerException e) { }
+    
     if (startKeys == null || startKeys.length == 0) {
       throw new IOException("Expecting at least one region");
     }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Wed Jan  7 00:29:49 2009
@@ -304,20 +304,20 @@
    */
   private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
       throws IOException {
-    double[][] C = new double[SIZE][SIZE];
+    double[][] c = new double[SIZE][SIZE];
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
         for (int k = 0; k < SIZE; k++) {
-          C[i][k] += m1.get(i, j) * m2.get(j, k);
+          c[i][k] += m1.get(i, j) * m2.get(j, k);
         }
       }
     }
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
-            .valueOf(C[i][j]).substring(0, 14));
+        double gap = (c[i][j] - result.get(i, j));
+        assertTrue(gap < 0.000001 || gap < -0.000001);
       }
     }
   }

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Wed Jan  7 00:29:49 2009
@@ -56,8 +56,8 @@
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf(
-            c.get(i, j)).substring(0, 5));
+        double gap = (mem[i][j] - c.get(i, j));
+        assertTrue(gap < 0.000001 || gap < -0.000001);
       }
     }
   }