You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/22 06:01:01 UTC
svn commit: r728611 -
/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
Author: edwardyoon
Date: Sun Dec 21 21:01:00 2008
New Revision: 728611
URL: http://svn.apache.org/viewvc?rev=728611&view=rev
Log:
Revert changes from revision 725570
Modified:
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=728611&r1=728610&r2=728611&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Sun Dec 21 21:01:00 2008
@@ -17,34 +17,32 @@
package org.apache.hama.mapred;
import java.io.IOException;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableSplit;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hama.Constants;
import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
import org.apache.hama.io.VectorWritable;
/**
* A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
*/
public class BlockingMapRed {
+
static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
/** Parameter of the path of the matrix to be blocked * */
public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
@@ -57,139 +55,126 @@
*/
public static void initJob(String matrixPath, JobConf job) {
job.setMapperClass(BlockingMapper.class);
+ job.setReducerClass(BlockingReducer.class);
FileInputFormat.addInputPaths(job, matrixPath);
- job.setInputFormat(MyInputFormat.class);
+ job.setInputFormat(VectorInputFormat.class);
job.setMapOutputKeyClass(BlockID.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setOutputFormat(NullOutputFormat.class);
job.set(BLOCKING_MATRIX, matrixPath);
- job.set(VectorInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
+ job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
}
/**
* Abstract Blocking Map/Reduce Class to configure the job.
*/
public static abstract class BlockingMapRedBase extends MapReduceBase {
+
protected DenseMatrix matrix;
+ protected int mBlockNum;
+ protected int mBlockRowSize;
+ protected int mBlockColSize;
+
+ protected int mRows;
+ protected int mColumns;
@Override
public void configure(JobConf job) {
try {
matrix = new DenseMatrix(new HamaConfiguration(), job.get(
BLOCKING_MATRIX, ""));
+ mBlockNum = matrix.getBlockSize();
+ mBlockRowSize = matrix.getRows() / mBlockNum;
+ mBlockColSize = matrix.getColumns() / mBlockNum;
+
+ mRows = matrix.getRows();
+ mColumns = matrix.getColumns();
} catch (IOException e) {
LOG.warn("Load matrix_blocking failed : " + e.getMessage());
}
}
+
}
/**
* Mapper Class
*/
public static class BlockingMapper extends BlockingMapRedBase implements
- Mapper<BlockID, BlockPosition, BlockID, VectorWritable> {
+ Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
@Override
- public void map(BlockID key, BlockPosition value,
+ public void map(IntWritable key, VectorWritable value,
OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
throws IOException {
- int startRow = value.getStartRow();
- int endRow = value.getEndRow();
- int startColumn = value.getStartColumn();
- int endColumn = value.getEndColumn();
-
- matrix.setBlock(key.getRow(), key.getColumn(), matrix.subMatrix(startRow,
- endRow, startColumn, endColumn));
+ int startColumn;
+ int endColumn;
+ int blkRow = key.get() / mBlockRowSize;
+ DenseVector dv = value.getDenseVector();
+
+ int i = 0;
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if(endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+ dv.subVector(startColumn, endColumn)));
+
+ i++;
+ } while(endColumn < (mColumns-1));
}
- }
- static class MyInputFormat extends TableInputFormatBase implements
- InputFormat<BlockID, BlockPosition>, JobConfigurable {
- static final Log LOG = LogFactory.getLog(MyInputFormat.class);
- private TableRecordReader tableRecordReader;
-
- /**
- * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
- */
- protected static class TableRecordReader extends TableRecordReaderBase
- implements RecordReader<BlockID, BlockPosition> {
-
- /**
- * @return IntWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createKey()
- */
- public BlockID createKey() {
- return new BlockID();
- }
+ }
- /**
- * @return BlockWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createValue()
- */
- public BlockPosition createValue() {
- return new BlockPosition();
- }
+ /**
+ * Reducer Class
+ */
+ public static class BlockingReducer extends BlockingMapRedBase implements
+ Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
- /**
- * @param key BlockID as input key.
- * @param value BlockWritable as input value
- *
- * Converts Scanner.next() to BlockID, BlockWritable
- *
- * @return true if there was more data
- * @throws IOException
- */
- public boolean next(BlockID key, BlockPosition value) throws IOException {
- RowResult result = this.scanner.next();
- boolean hasMore = result != null && result.size() > 0;
- if (hasMore) {
- byte[] row = result.getRow();
- BlockID bID = new BlockID(row);
- key.set(bID.getRow(), bID.getColumn());
- Writables.copyWritable(
- new BlockPosition(result.get(Constants.BLOCK_POSITION).getValue()),
- value);
+ @Override
+ public void reduce(BlockID key, Iterator<VectorWritable> values,
+ OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+ throws IOException {
+ // Note: all the sub-vectors are grouped by {@link
+ // org.apache.hama.io.BlockID}
+
+ // the block's base offset in the original matrix
+ int colBase = key.getColumn() * mBlockColSize;
+ int rowBase = key.getRow() * mBlockRowSize;
+
+ // the block's size : rows & columns
+ int smRows = mBlockRowSize;
+ if((rowBase + mBlockRowSize - 1) >= mRows)
+ smRows = mRows - rowBase;
+ int smCols = mBlockColSize;
+ if((colBase + mBlockColSize - 1) >= mColumns)
+ smCols = mColumns - colBase;
+
+ // construct the matrix
+ SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+
+ // i, j is the current offset in the sub-matrix
+ int i = 0, j = 0;
+ while (values.hasNext()) {
+ VectorWritable vw = values.next();
+ // check the size is suitable
+ if (vw.size() != smCols)
+ throw new IOException("Block Column Size dismatched.");
+ i = vw.row - rowBase;
+ if (i >= smRows || i < 0)
+ throw new IOException("Block Row Size dismatched.");
+
+ // put the subVector to the subMatrix
+ for (j = 0; j < smCols; j++) {
+ subMatrix.set(i, j, vw.get(colBase + j));
}
- return hasMore;
- }
- }
-
- /**
- * Builds a TableRecordReader. If no TableRecordReader was provided, uses
- * the default.
- *
- * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
- * JobConf, Reporter)
- */
- public RecordReader<BlockID, BlockPosition> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
- TableSplit tSplit = (TableSplit) split;
- TableRecordReader trr = this.tableRecordReader;
- // if no table record reader was provided use default
- if (trr == null) {
- trr = new TableRecordReader();
}
- trr.setStartRow(tSplit.getStartRow());
- trr.setEndRow(tSplit.getEndRow());
- trr.setHTable(this.table);
- trr.setInputColumns(this.inputColumns);
- trr.setRowFilter(this.rowFilter);
- trr.init();
- return trr;
- }
- /**
- * Allows subclasses to set the {@link TableRecordReader}.
- *
- * @param tableRecordReader to provide other {@link TableRecordReader}
- * implementations.
- */
- protected void setTableRecordReader(TableRecordReader tableRecordReader) {
- this.tableRecordReader = tableRecordReader;
+ matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
}
}
+
}