You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/05 01:30:06 UTC
svn commit: r731394 - in /incubator/hama/trunk/src/java/org/apache/hama:
Constants.java DenseMatrix.java algebra/BlockCyclicMultiplyMap.java
Author: edwardyoon
Date: Sun Jan 4 16:30:05 2009
New Revision: 731394
URL: http://svn.apache.org/viewvc?rev=731394&view=rev
Log:
Bug of scanner range in block multiplication
Modified:
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Jan 4 16:30:05 2009
@@ -91,4 +91,6 @@
public static final String BLOCK = "block:";
public static final String BLOCK_PATH = "attribute:blockPath";
+
+ public static final String BLOCK_SIZE = "attribute:blockSize";
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Sun Jan 4 16:30:05 2009
@@ -395,7 +395,7 @@
if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(),
- ((DenseMatrix) B).getBlockedMatrixPath(),
+ ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(),
BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
jobConf);
BlockCyclicMultiplyReduce.initJob(result.getPath(),
@@ -506,7 +506,7 @@
int block_size = (int) blocks;
Matrix blockedMatrix = new DenseMatrix(config);
blockedMatrix.setDimension(block_size, block_size);
- this.setBlockedMatrixPath(blockedMatrix.getPath());
+ this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size);
JobConf jobConf = new JobConf(config);
jobConf.setJobName("Blocking MR job" + getPath());
@@ -524,9 +524,15 @@
Constants.BLOCK_PATH).getValue());
}
- protected void setBlockedMatrixPath(String path) throws IOException {
+ protected void setBlockedMatrixPath(String path, int size) throws IOException {
BatchUpdate update = new BatchUpdate(Constants.METADATA);
update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
+ update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size));
table.commit(update);
}
+
+ public int getBlockedMatrixSize() throws IOException {
+ return Bytes.toInt(table.get(Constants.METADATA,
+ Constants.BLOCK_SIZE).getValue());
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=731394&r1=731393&r2=731394&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Sun Jan 4 16:30:05 2009
@@ -32,8 +32,6 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.HamaConfiguration;
import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
@@ -43,25 +41,29 @@
public class BlockCyclicMultiplyMap extends MapReduceBase implements
Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
- protected DenseMatrix matrix_b;
+ protected HTable table;
+ protected int block_size;
public static final String MATRIX_B = "hama.multiplication.matrix.b";
+ public static final String BLOCK_SIZE = "hama.multiplication.block.size";
public void configure(JobConf job) {
try {
- matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+ table = new HTable(job.get(MATRIX_B, ""));
+ block_size = Integer.parseInt(job.get(BLOCK_SIZE, ""));
} catch (IOException e) {
LOG.warn("Load matrix_b failed : " + e.getMessage());
}
}
public static void initJob(String matrix_a, String matrix_b,
- Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
+ int block_size, Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
Class<BlockWritable> outputValueClass, JobConf jobConf) {
jobConf.setMapOutputValueClass(outputValueClass);
jobConf.setMapOutputKeyClass(outputKeyClass);
jobConf.setMapperClass(map);
jobConf.set(MATRIX_B, matrix_b);
+ jobConf.set(BLOCK_SIZE, String.valueOf(block_size));
jobConf.setInputFormat(BlockInputFormat.class);
FileInputFormat.addInputPaths(jobConf, matrix_a);
@@ -73,25 +75,36 @@
public void map(BlockID key, BlockWritable value,
OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
- // we don't need to get blockSize each time
- int blockSize = matrix_b.getRows();
SubMatrix a = value.get();
- HTable table = matrix_b.getHTable();
- // startKey : new BlockID(key.getColumn(), 0).toString()
- // endKey : new BlockID(key.getColumn(), blockSize+1).toString()
- Scanner scan = table.getScanner(new byte[][] { Bytes
- .toBytes(Constants.BLOCK) },
- new BlockID(key.getColumn(), 0).getBytes(), new BlockID(
- key.getColumn(), blockSize + 1).getBytes());
+ BlockID startBlock = new BlockID(key.getColumn(), 0);
+ BlockID endBlock = new BlockID(key.getColumn() + 1, 0);
+ Scanner scan;
+ if ((key.getColumn() + 1) == block_size) {
+ scan = table.getScanner(new byte[][] { Bytes
+ .toBytes(Constants.BLOCK) }, startBlock.getBytes());
+ } else {
+ scan = table.getScanner(new byte[][] { Bytes
+ .toBytes(Constants.BLOCK) }, startBlock.getBytes(), endBlock
+ .getBytes());
+ }
+ int blocks = 0;
for (RowResult row : scan) {
BlockID bid = new BlockID(row.getRow());
SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
SubMatrix c = a.mult(b);
output.collect(new BlockID(key.getRow(), bid.getColumn()),
new BlockWritable(c));
+ blocks++;
}
+
+ if (blocks == 0)
+ throw new IOException("There is no matrix b." +
+ "\ntableName: " + new String(table.getTableName())
+ + "\nscanner startKey: " + startBlock.toString() + ", endKey: "
+ + endBlock.toString());
+
scan.close();
}
}