You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/02 01:57:32 UTC
svn commit: r722319 - in /incubator/hama/trunk/src:
java/org/apache/hama/io/BlockID.java
java/org/apache/hama/mapred/BlockingMapRed.java
test/org/apache/hama/TestBlockRowId.java
test/org/apache/hama/TestDenseMatrix.java
Author: edwardyoon
Date: Mon Dec 1 16:57:31 2008
New Revision: 722319
URL: http://svn.apache.org/viewvc?rev=722319&view=rev
Log:
Fix blocking mapred bug
Removed:
incubator/hama/trunk/src/test/org/apache/hama/TestBlockRowId.java
Modified:
incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Mon Dec 1 16:57:31 2008
@@ -93,6 +93,7 @@
/**
* BlockID Comparator
*/
+ /* why we need a special Comparator
public static class Comparator extends WritableComparator {
protected Comparator() {
super(BlockID.class);
@@ -101,6 +102,8 @@
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int thisRow = readInt(b1, s1);
int thatRow = readInt(b2, s2);
+ // why read from l1 & l2
+ // l1 means length 1, and l2 means length 2
int thisColumn = readInt(b1, l1);
int thatColumn = readInt(b2, l2);
@@ -116,4 +119,5 @@
static { // register this comparator
WritableComparator.define(BlockID.class, new Comparator());
}
+ */
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Mon Dec 1 16:57:31 2008
@@ -16,17 +16,12 @@
*/
package org.apache.hama.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
@@ -45,14 +40,14 @@
/**
* A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
- **/
+ */
public class BlockingMapRed {
- static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
- /** Parameter of the path of the matrix to be blocked **/
+ static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
+ /** Parameter of the path of the matrix to be blocked * */
public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
-
- /**
+
+ /**
* Initialize a job to blocking a table
*
* @param matrixPath
@@ -62,31 +57,31 @@
job.setMapperClass(BlockingMapper.class);
job.setReducerClass(BlockingReducer.class);
FileInputFormat.addInputPaths(job, matrixPath);
-
+
job.setInputFormat(VectorInputFormat.class);
- job.setMapOutputKeyClass(BlockRowId.class);
+ job.setMapOutputKeyClass(BlockID.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setOutputFormat(NullOutputFormat.class);
- job.setOutputValueGroupingComparator(GroupingComparator.class);
-
+
job.set(BLOCKING_MATRIX, matrixPath);
job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
}
-
+
/**
* Abstract Blocking Map/Reduce Class to configure the job.
*/
public static abstract class BlockingMapRedBase extends MapReduceBase {
-
+
protected DenseMatrix matrix;
protected int mBlockNum;
protected int mBlockRowSize;
protected int mBlockColSize;
-
+
@Override
public void configure(JobConf job) {
try {
- matrix = new DenseMatrix(new HamaConfiguration(), job.get(BLOCKING_MATRIX, ""));
+ matrix = new DenseMatrix(new HamaConfiguration(), job.get(
+ BLOCKING_MATRIX, ""));
mBlockNum = matrix.getBlockSize();
mBlockRowSize = matrix.getRows() / mBlockNum;
mBlockColSize = matrix.getColumns() / mBlockNum;
@@ -94,177 +89,66 @@
LOG.warn("Load matrix_blocking failed : " + e.getMessage());
}
}
-
+
}
-
+
/**
* Mapper Class
*/
public static class BlockingMapper extends BlockingMapRedBase implements
- Mapper<IntWritable, VectorWritable, BlockRowId, VectorWritable> {
+ Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
@Override
public void map(IntWritable key, VectorWritable value,
- OutputCollector<BlockRowId, VectorWritable> output, Reporter reporter)
+ OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
throws IOException {
int startColumn;
int endColumn;
int blkRow = key.get() / mBlockRowSize;
DenseVector dv = value.getDenseVector();
- for(int i = 0 ; i < mBlockNum; i++) {
+ for (int i = 0; i < mBlockNum; i++) {
startColumn = i * mBlockColSize;
endColumn = startColumn + mBlockColSize - 1;
- output.collect(new BlockRowId(new BlockID(blkRow, i), key.get()),
- new VectorWritable(key.get(), (DenseVector) dv.subVector(startColumn, endColumn)));
+ output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+ (DenseVector) dv.subVector(startColumn, endColumn)));
}
}
}
-
+
/**
* Reducer Class
*/
public static class BlockingReducer extends BlockingMapRedBase implements
- Reducer<BlockRowId, VectorWritable, BlockID, SubMatrix> {
+ Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
@Override
- public void reduce(BlockRowId key, Iterator<VectorWritable> values,
+ public void reduce(BlockID key, Iterator<VectorWritable> values,
OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
throws IOException {
- BlockID blkId = key.getBlockId();
- final SubMatrix subMatrix = new SubMatrix(mBlockRowSize, mBlockColSize);
- int i=0, j=0;
- int colBase = blkId.getColumn() * mBlockColSize;
- while(values.hasNext()) {
- if(i > mBlockRowSize)
- throw new IOException("BlockRowSize dismatched.");
-
+ // Note: all the sub-vectors are grouped by {@link
+ // org.apache.hama.io.BlockID}
+ SubMatrix subMatrix = new SubMatrix(mBlockRowSize, mBlockColSize);
+ int i = 0, j = 0;
+ int colBase = key.getColumn() * mBlockColSize;
+ int rowBase = key.getRow() * mBlockRowSize;
+ while (values.hasNext()) {
VectorWritable vw = values.next();
- if(vw.size() != mBlockColSize)
+ // check the size is suitable
+ if (vw.size() != mBlockColSize)
throw new IOException("BlockColumnSize dismatched.");
- for(j=0; j<mBlockColSize; j++) {
+ i = vw.row - rowBase;
+ if (i >= mBlockRowSize || i < 0)
+ throw new IOException("BlockRowSize dismatched.");
+
+ // put the subVector to the subMatrix
+ for (j = 0; j < mBlockColSize; j++) {
subMatrix.set(i, j, vw.get(colBase + j));
}
- i++;
}
-
- matrix.setBlock(blkId.getRow(), blkId.getColumn(), subMatrix);
- }
- }
-
- /**
- * A Grouping Comparator to group all the {@link BlockRowId} with
- * same {@link org.apache.hama.io.BlockID}
- */
- public static class GroupingComparator implements RawComparator<BlockRowId> {
- private final DataInputBuffer buffer = new DataInputBuffer();
- private final BlockRowId key1 = new BlockRowId();
- private final BlockRowId key2 = new BlockRowId();
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
- int l2) {
- try {
- buffer.reset(b1, s1, l1); // parse k1
- key1.readFields(buffer);
-
- buffer.reset(b2, s2, l2);
- key2.readFields(buffer);
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return compare(key1, key2);
+ matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
}
-
- @Override
- public int compare(BlockRowId first, BlockRowId second) {
- return first.mBlockId.compareTo(second.mBlockId);
- }
-
}
-
- /**
- * Help Class to store <blockid, rowid> pair.
- */
- public static class BlockRowId implements WritableComparable<BlockRowId> {
-
- BlockID mBlockId;
- int mRowId;
-
- /**
- * Empty Constructor used for serialization.
- */
- public BlockRowId() { }
-
- /**
- * Construct a block-row id using blockid & rowid
- * @param blockid
- * @param rowid
- */
- public BlockRowId(BlockID blockid, int rowid) {
- mBlockId = blockid;
- mRowId = rowid;
- }
-
- /**
- * Get the block ID
- * @return BlockID
- */
- public BlockID getBlockId() { return mBlockId; }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- mBlockId = new BlockID();
- mBlockId.readFields(in);
- mRowId = in.readInt();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- mBlockId.write(out);
- out.writeInt(mRowId);
- }
- // first compare block ids
- // when blockids are same, then compare row ids
- @Override
- public int compareTo(BlockRowId another) {
- int cmp = mBlockId.compareTo(another.mBlockId);
- if(cmp == 0) {
- cmp = mRowId - another.mRowId;
- }
- return cmp;
- }
-
- @Override
- public boolean equals(Object obj) {
- if(obj == null) return false;
- if(!(obj instanceof BlockRowId)) return false;
-
- BlockRowId another = (BlockRowId)obj;
- return compareTo(another) == 0;
- }
-
- // tricky here
- // we just used the block id to generate the hashcode
- // so that the same block will be in the same reducer
- @Override
- public int hashCode() {
- return mBlockId.hashCode();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("<");
- sb.append(mBlockId);
- sb.append(",");
- sb.append(mRowId);
- sb.append(">");
- return sb.toString();
- }
- }
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=722319&r1=722318&r2=722319&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Dec 1 16:57:31 2008
@@ -84,7 +84,7 @@
public void testBlocking() throws IOException, ClassNotFoundException {
assertEquals(((DenseMatrix) m1).isBlocked(), false);
- ((DenseMatrix) m1).blocking_mapred(2);
+ ((DenseMatrix) m1).blocking(2);
assertEquals(((DenseMatrix) m1).isBlocked(), true);
int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();