You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/16 07:47:18 UTC
svn commit: r734925 -
/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
Author: edwardyoon
Date: Thu Jan 15 22:47:18 2009
New Revision: 734925
URL: http://svn.apache.org/viewvc?rev=734925&view=rev
Log:
Add comment.
Modified:
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=734925&r1=734924&r2=734925&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Thu Jan 15 22:47:18 2009
@@ -52,23 +52,23 @@
* Initialize a job to blocking a table
*
* @param matrixPath
- * @param collectionTable
- * @param block_size
- * @param j
- * @param i
+ * @param collectionTable
+ * @param block_size
+ * @param j
+ * @param i
* @param job
*/
- public static void initJob(String matrixPath, String collectionTable, boolean bool
- ,int block_size, int i, int j, JobConf job) {
+ public static void initJob(String matrixPath, String collectionTable,
+ boolean bool, int block_size, int i, int j, JobConf job) {
job.setMapperClass(BlockingMapper.class);
job.setReducerClass(BlockingReducer.class);
FileInputFormat.addInputPaths(job, matrixPath);
job.setInputFormat(VectorInputFormat.class);
-
+
job.setMapOutputKeyClass(BlockID.class);
job.setMapOutputValueClass(VectorWritable.class);
-
+
job.setOutputFormat(BlockOutputFormat.class);
job.setOutputKeyClass(BlockID.class);
job.setOutputValueClass(BlockWritable.class);
@@ -77,7 +77,7 @@
job.set(COLUMNS, String.valueOf(j));
job.setBoolean(MATRIX_POS, bool);
job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
- if(bool)
+ if (bool)
job.set(BlockOutputFormat.COLUMN, "a");
else
job.set(BlockOutputFormat.COLUMN, "b");
@@ -91,22 +91,22 @@
protected int mBlockNum;
protected int mBlockRowSize;
protected int mBlockColSize;
-
+
protected int mRows;
protected int mColumns;
-
+
protected boolean matrixPos;
@Override
public void configure(JobConf job) {
- mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
- mRows = Integer.parseInt(job.get(ROWS, ""));
- mColumns = Integer.parseInt(job.get(COLUMNS, ""));
-
- mBlockRowSize = mRows / mBlockNum;
- mBlockColSize = mColumns / mBlockNum;
-
- matrixPos = job.getBoolean(MATRIX_POS, true);
+ mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(job.get(ROWS, ""));
+ mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+
+ matrixPos = job.getBoolean(MATRIX_POS, true);
}
}
@@ -124,23 +124,25 @@
int endColumn;
int blkRow = key.get() / mBlockRowSize;
DenseVector dv = value.getDenseVector();
-
+
int i = 0;
do {
startColumn = i * mBlockColSize;
endColumn = startColumn + mBlockColSize - 1;
- if(endColumn >= mColumns) // the last sub vector
+ if (endColumn >= mColumns) // the last sub vector
endColumn = mColumns - 1;
- output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
- dv.subVector(startColumn, endColumn)));
-
+ output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), dv
+ .subVector(startColumn, endColumn)));
+
i++;
- } while(endColumn < (mColumns-1));
+ } while (endColumn < (mColumns - 1));
}
}
/**
- * Reducer Class
+ * Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k)
+ * to avoid duplicated records. Each row has a two sub matrices of a(i, k) and
+ * b(k, j).
*/
public static class BlockingReducer extends BlockingMapRedBase implements
Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
@@ -149,23 +151,24 @@
public void reduce(BlockID key, Iterator<VectorWritable> values,
OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
- // Note: all the sub-vectors are grouped by {@link org.apache.hama.io.BlockID}
-
+ // Note: all the sub-vectors are grouped by {@link
+ // org.apache.hama.io.BlockID}
+
// the block's base offset in the original matrix
int colBase = key.getColumn() * mBlockColSize;
int rowBase = key.getRow() * mBlockRowSize;
-
+
// the block's size : rows & columns
int smRows = mBlockRowSize;
- if((rowBase + mBlockRowSize - 1) >= mRows)
+ if ((rowBase + mBlockRowSize - 1) >= mRows)
smRows = mRows - rowBase;
int smCols = mBlockColSize;
- if((colBase + mBlockColSize - 1) >= mColumns)
+ if ((colBase + mBlockColSize - 1) >= mColumns)
smCols = mColumns - colBase;
-
+
// construct the matrix
SubMatrix subMatrix = new SubMatrix(smRows, smCols);
-
+
// i, j is the current offset in the sub-matrix
int i = 0, j = 0;
while (values.hasNext()) {
@@ -185,7 +188,7 @@
BlockWritable outValue = new BlockWritable(subMatrix);
// It'll used for only matrix multiplication.
- if(matrixPos) {
+ if (matrixPos) {
for (int x = 0; x < mBlockNum; x++) {
int r = (key.getRow() * mBlockNum) * mBlockNum;
int seq = (x * mBlockNum) + key.getColumn() + r;
@@ -193,11 +196,11 @@
}
} else {
for (int x = 0; x < mBlockNum; x++) {
- int seq = (x * mBlockNum * mBlockNum) +
- (key.getColumn() * mBlockNum) + key.getRow();
+ int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+ + key.getRow();
output.collect(new BlockID(x, key.getColumn(), seq), outValue);
}
}
}
}
-}
\ No newline at end of file
+}