You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/07 09:29:49 UTC
svn commit: r732267 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/
src/test/org/apache/hama/ src/test/org/apache/hama/mapred/
Author: edwardyoon
Date: Wed Jan 7 00:29:49 2009
New Revision: 732267
URL: http://svn.apache.org/viewvc?rev=732267&view=rev
Log:
Fix OOME.
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan 7 00:29:49 2009
@@ -33,6 +33,7 @@
IMPROVEMENTS
+ HAMA-142: Trunk doesn't work for large matrices (edwardyoon)
HAMA-143: Improve of random_mapred() (edwardyoon)
HAMA-138: To order, left pad with zeroes to integer key (edwardyoon)
HAMA-135, HAMA-137: Refactor mapred, I/O package (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Wed Jan 7 00:29:49 2009
@@ -22,9 +22,6 @@
import java.io.IOException;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
@@ -42,33 +39,30 @@
Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
protected HTable table;
- protected int block_size;
public static final String MATRIX_B = "hama.multiplication.matrix.b";
- public static final String BLOCK_SIZE = "hama.multiplication.block.size";
public void configure(JobConf job) {
try {
table = new HTable(job.get(MATRIX_B, ""));
- block_size = Integer.parseInt(job.get(BLOCK_SIZE, ""));
} catch (IOException e) {
LOG.warn("Load matrix_b failed : " + e.getMessage());
}
}
- public static void initJob(String matrix_a, String matrix_b,
- int block_size, Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
+ public static void initJob(String matrix_a, String matrix_b, int block_size,
+ Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
Class<BlockWritable> outputValueClass, JobConf jobConf) {
jobConf.setMapOutputValueClass(outputValueClass);
jobConf.setMapOutputKeyClass(outputKeyClass);
jobConf.setMapperClass(map);
jobConf.set(MATRIX_B, matrix_b);
- jobConf.set(BLOCK_SIZE, String.valueOf(block_size));
jobConf.setInputFormat(BlockInputFormat.class);
FileInputFormat.addInputPaths(jobConf, matrix_a);
jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+ jobConf.set(BlockInputFormat.REPEAT_NUM, String.valueOf(block_size));
}
@Override
@@ -76,35 +70,12 @@
OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
SubMatrix a = value.get();
-
- BlockID startBlock = new BlockID(key.getColumn(), 0);
- BlockID endBlock = new BlockID(key.getColumn() + 1, 0);
- Scanner scan;
- if ((key.getColumn() + 1) == block_size) {
- scan = table.getScanner(new byte[][] { Bytes
- .toBytes(Constants.BLOCK) }, startBlock.getBytes());
- } else {
- scan = table.getScanner(new byte[][] { Bytes
- .toBytes(Constants.BLOCK) }, startBlock.getBytes(), endBlock
- .getBytes());
- }
-
- int blocks = 0;
- for (RowResult row : scan) {
- BlockID bid = new BlockID(row.getRow());
- SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
- SubMatrix c = a.mult(b);
- output.collect(new BlockID(key.getRow(), bid.getColumn()),
- new BlockWritable(c));
- blocks++;
- }
-
- if (blocks == 0)
- throw new IOException("There is no matrix b." +
- "\ntableName: " + new String(table.getTableName())
- + "\nscanner startKey: " + startBlock.toString() + ", endKey: "
- + endBlock.toString());
-
- scan.close();
+ SubMatrix b = new SubMatrix(table.get(
+ new BlockID(key.getColumn(), BlockInputFormat.getRepeatCount())
+ .toString(), Constants.BLOCK).getValue());
+ SubMatrix c = a.mult(b);
+ output.collect(
+ new BlockID(key.getRow(), BlockInputFormat.getRepeatCount()),
+ new BlockWritable(c));
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Wed Jan 7 00:29:49 2009
@@ -43,12 +43,21 @@
static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
private TableRecordReader tableRecordReader;
+ public static int getRepeatCount() {
+ return TableRecordReader.repeatCount;
+ }
+
+ public static int getRepeat() {
+ return repeat;
+ }
+
/**
* Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
*/
protected static class TableRecordReader extends TableRecordReaderBase
implements RecordReader<BlockID, BlockWritable> {
-
+ private static int repeatCount = 0;
+
/**
* @return IntWritable
*
@@ -89,6 +98,15 @@
}
boolean hasMore = result != null && result.size() > 0;
+
+ // Scanner will be restarted.
+ if(!hasMore && repeatCount < repeat - 1) {
+ this.init();
+ repeatCount++;
+ result = this.scanner.next();
+ hasMore = result != null && result.size() > 0;
+ }
+
if (hasMore) {
byte[] row = result.getRow();
BlockID bID = new BlockID(row);
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Wed Jan 7 00:29:49 2009
@@ -40,14 +40,19 @@
protected byte[][] inputColumns;
protected HTable table;
protected RowFilterInterface rowFilter;
-
+ protected static int repeat;
+
/**
* space delimited list of columns
*/
public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+ public static final String REPEAT_NUM = "hama.mapred.repeat";
public void configure(JobConf job) {
Path[] tableNames = FileInputFormat.getInputPaths(job);
+ if(job.get(REPEAT_NUM) != null) {
+ setRepeat(Integer.parseInt(job.get(REPEAT_NUM)));
+ }
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
byte[][] m_cols = new byte[colNames.length][];
@@ -62,6 +67,10 @@
}
}
+ private void setRepeat(int parseInt) {
+ repeat = parseInt;
+ }
+
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path[] tableNames = FileInputFormat.getInputPaths(job);
@@ -94,7 +103,11 @@
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- byte [][] startKeys = this.table.getStartKeys();
+ byte [][] startKeys = null;
+ try {
+ startKeys = this.table.getStartKeys();
+ } catch (NullPointerException e) { }
+
if (startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Wed Jan 7 00:29:49 2009
@@ -304,20 +304,20 @@
*/
private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
throws IOException {
- double[][] C = new double[SIZE][SIZE];
+ double[][] c = new double[SIZE][SIZE];
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
for (int k = 0; k < SIZE; k++) {
- C[i][k] += m1.get(i, j) * m2.get(j, k);
+ c[i][k] += m1.get(i, j) * m2.get(j, k);
}
}
}
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
- .valueOf(C[i][j]).substring(0, 14));
+ double gap = (c[i][j] - result.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
}
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=732267&r1=732266&r2=732267&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Wed Jan 7 00:29:49 2009
@@ -56,8 +56,8 @@
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf(
- c.get(i, j)).substring(0, 5));
+ double gap = (mem[i][j] - c.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
}
}
}