You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/16 04:21:48 UTC
svn commit: r726934 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/
src/java/org/apache/hama/mapred/ src/test/org/apache/hama/
src/test/org/apache/hama/mapred/
Author: edwardyoon
Date: Mon Dec 15 19:21:47 2008
New Revision: 726934
URL: http://svn.apache.org/viewvc?rev=726934&view=rev
Log:
Refactor mapred and I/O packages
Removed:
incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Dec 15 19:21:47 2008
@@ -33,6 +33,7 @@
IMPROVEMENTS
+ HAMA-135, HAMA-137: Refactor mapred, I/O package (edwardyoon)
HAMA-134: We don't need to fill C with zeros (edwardyoon)
HAMA-131: Add argument for the number of blocks (edwardyoon)
HAMA-113: Random matrix generator on map/reduce (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Dec 15 19:21:47 2008
@@ -99,10 +99,7 @@
/** end column of block */
public static final String BLOCK_ENDCOLUMN = "attribute:endColumn";
-
- public static final String BLOCK_POSITION = Constants.BLOCK_STARTROW
- + " " + Constants.BLOCK_ENDROW + " " + Constants.BLOCK_STARTCOLUMN
- + " " + Constants.BLOCK_ENDCOLUMN;
+ public static final String BLOCK_POSITION = "attribute:blockPosition";
/** block dimension */
public static final String BLOCK = "block:";
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Dec 15 19:21:47 2008
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -47,6 +48,7 @@
import org.apache.hama.algebra.SIMDMultiplyMap;
import org.apache.hama.algebra.SIMDMultiplyReduce;
import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockPosition;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
import org.apache.hama.io.MapWritable;
@@ -393,7 +395,7 @@
if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
- BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
+ BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
jobConf);
BlockCyclicMultiplyReduce.initJob(result.getPath(),
BlockCyclicMultiplyReduce.class, jobConf);
@@ -474,8 +476,8 @@
}
public SubMatrix getBlock(int i, int j) throws IOException {
- return new SubMatrix(table.get(String.valueOf(i), Constants.BLOCK + j)
- .getValue());
+ return new SubMatrix(table.get(new BlockID(i, j).getBytes(),
+ Bytes.toBytes(Constants.BLOCK)).getValue());
}
/**
@@ -494,8 +496,8 @@
}
public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
- BatchUpdate update = new BatchUpdate(String.valueOf(i));
- update.put(Constants.BLOCK + j, matrix.getBytes());
+ BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
+ update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
table.commit(update);
}
@@ -519,11 +521,8 @@
endColumn = this.getColumns() - 1;
BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
- update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
- update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
- update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
- .intToBytes(startColumn));
- update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
+ update.put(Constants.BLOCK_POSITION,
+ new BlockPosition(startRow, endRow, startColumn, endColumn).getBytes());
table.commit(update);
j++;
@@ -533,19 +532,10 @@
} while (endRow < (this.getRows() - 1));
}
- protected int[] getBlockPosition(int i, int j) throws IOException {
- RowResult rs = table.getRow(new BlockID(i, j).getBytes());
- int[] result = new int[4];
-
- result[0] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTROW)
- .getValue());
- result[1] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDROW).getValue());
- result[2] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTCOLUMN)
- .getValue());
- result[3] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDCOLUMN)
- .getValue());
-
- return result;
+ protected BlockPosition getBlockPosition(int i, int j) throws IOException {
+ byte[] rs = table.get(new BlockID(i, j).getBytes(),
+ Bytes.toBytes(Constants.BLOCK_POSITION)).getValue();
+ return new BlockPosition(rs);
}
/**
@@ -577,17 +567,17 @@
*/
public void blocking(int blockNum) throws IOException {
this.checkBlockNum(blockNum);
-
- String[] columns = new String[] { Constants.BLOCK_STARTROW,
- Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
- Constants.BLOCK_ENDCOLUMN };
+
+ String[] columns = new String[] { Constants.BLOCK_POSITION };
Scanner scan = table.getScanner(columns);
for (RowResult row : scan) {
BlockID bID = new BlockID(row.getRow());
- int[] pos = getBlockPosition(bID.getRow(), bID.getColumn());
- setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos[0], pos[1], pos[2],
- pos[3]));
+ BlockPosition pos =
+ new BlockPosition(row.get(Constants.BLOCK_POSITION).getValue());
+
+ setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(),
+ pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()));
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Mon Dec 15 19:21:47 2008
@@ -33,7 +33,7 @@
* bigger matrix. This is a in-memory operation only.
*/
public class SubMatrix implements java.io.Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3897536498367921547L;
static final Logger LOG = Logger.getLogger(SubMatrix.class);
private double[][] matrix;
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Mon Dec 15 19:21:47 2008
@@ -21,7 +21,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
@@ -33,22 +32,18 @@
import org.apache.hama.HamaConfiguration;
import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.mapred.BlockInputFormat;
import org.apache.log4j.Logger;
public class BlockCyclicMultiplyMap extends MapReduceBase implements
- Mapper<BlockID, BlockPosition, IntWritable, BlockWritable> {
+ Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
- protected DenseMatrix matrix_a;
- public static final String MATRIX_A = "hama.multiplication.matrix.a";
protected DenseMatrix matrix_b;
public static final String MATRIX_B = "hama.multiplication.matrix.b";
public void configure(JobConf job) {
try {
- matrix_a = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_A, ""));
matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
} catch (IOException e) {
LOG.warn("Load matrix_b failed : " + e.getMessage());
@@ -56,32 +51,30 @@
}
public static void initJob(String matrix_a, String matrix_b,
- Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
+ Class<BlockCyclicMultiplyMap> map, Class<BlockID> outputKeyClass,
Class<BlockWritable> outputValueClass, JobConf jobConf) {
jobConf.setMapOutputValueClass(outputValueClass);
jobConf.setMapOutputKeyClass(outputKeyClass);
jobConf.setMapperClass(map);
- jobConf.set(MATRIX_A, matrix_a);
jobConf.set(MATRIX_B, matrix_b);
jobConf.setInputFormat(BlockInputFormat.class);
FileInputFormat.addInputPaths(jobConf, matrix_a);
- jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
+ jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
}
@Override
- public void map(BlockID key, @SuppressWarnings("unused") BlockPosition value,
- OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
+ public void map(BlockID key, BlockWritable value,
+ OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
int blockSize = matrix_b.getBlockSize();
- SubMatrix a = matrix_a.getBlock(key.getRow(), key.getColumn());
+ SubMatrix a = value.get();
for (int j = 0; j < blockSize; j++) {
SubMatrix b = matrix_b.getBlock(key.getColumn(), j);
SubMatrix c = a.mult(b);
- output.collect(new IntWritable(key.getRow()),
- new BlockWritable(key.getRow(), j, c));
+ output.collect(new BlockID(key.getRow(), j), new BlockWritable(c));
}
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java Mon Dec 15 19:21:47 2008
@@ -20,9 +20,7 @@
package org.apache.hama.algebra;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.io.IntWritable;
@@ -32,14 +30,14 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockEntry;
+import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.mapred.VectorOutputFormat;
import org.apache.log4j.Logger;
public class BlockCyclicMultiplyReduce extends MapReduceBase implements
- Reducer<IntWritable, BlockWritable, IntWritable, VectorUpdate> {
+ Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
/**
@@ -60,39 +58,29 @@
}
@Override
- public void reduce(IntWritable key, Iterator<BlockWritable> values,
+ public void reduce(BlockID key, Iterator<BlockWritable> values,
OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
throws IOException {
- int row = key.get();
- Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
+ SubMatrix s = null;
while (values.hasNext()) {
- BlockWritable b = values.next();
- for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
- int j = e.getKey();
- SubMatrix value = e.getValue().getValue();
- if (sum.containsKey(j)) {
- sum.put(j, sum.get(j).add(value));
- } else {
- sum.put(j, value);
- }
+ SubMatrix b = values.next().get();
+ if (s == null) {
+ s = b;
+ } else {
+ s = s.add(b);
}
}
- for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
- int column = e.getKey();
- SubMatrix mat = e.getValue();
-
- int startRow = row * mat.getRows();
- int startColumn = column * mat.getColumns();
-
- for (int i = 0; i < mat.getRows(); i++) {
- VectorUpdate update = new VectorUpdate(i + startRow);
- for (int j = 0; j < mat.getColumns(); j++) {
- update.put(j + startColumn, mat.get(i, j));
- }
- output.collect(key, update);
+ int startRow = key.getRow() * s.getRows();
+ int startColumn = key.getColumn() * s.getColumns();
+
+ for (int i = 0; i < s.getRows(); i++) {
+ VectorUpdate update = new VectorUpdate(i + startRow);
+ for (int j = 0; j < s.getColumns(); j++) {
+ update.put(j + startColumn, s.get(i, j));
}
+ output.collect(new IntWritable(key.getRow()), update);
}
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Mon Dec 15 19:21:47 2008
@@ -32,7 +32,7 @@
/** A WritableComparable for BlockIDs. */
@SuppressWarnings("unchecked")
public class BlockID implements WritableComparable, java.io.Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 6434651179475226613L;
private int row;
private int column;
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java Mon Dec 15 19:21:47 2008
@@ -19,130 +19,105 @@
*/
package org.apache.hama.io;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-public class BlockPosition implements Writable, Map<Text, IntegerEntry> {
-
- public BlockID row;
- public BlockPositionMapWritable<Text, IntegerEntry> entries;
+public class BlockPosition implements Writable, java.io.Serializable {
+ private static final long serialVersionUID = 3717208691381491714L;
+ public int startRow;
+ public int endRow;
+ public int startColumn;
+ public int endColumn;
public BlockPosition() {
- this(new BlockPositionMapWritable<Text, IntegerEntry>());
- }
-
- public BlockPosition(BlockPositionMapWritable<Text, IntegerEntry> entries) {
- this.entries = entries;
}
- public int getIndex(String key) {
- return this.entries.get(new Text(key)).getValue();
- }
-
- public int size() {
- return this.entries.size();
+ public BlockPosition(byte[] bytes) throws IOException {
+ ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
+ ObjectInputStream oos = new ObjectInputStream(bos);
+ Object obj = null;
+ try {
+ obj = oos.readObject();
+ this.startRow = ((BlockPosition)obj).startRow;
+ this.endRow = ((BlockPosition)obj).endRow;
+ this.startColumn = ((BlockPosition)obj).startColumn;
+ this.endColumn = ((BlockPosition)obj).endColumn;
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ oos.close();
+ bos.close();
}
- public IntegerEntry put(Text key, IntegerEntry value) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
+ public BlockPosition(int startRow, int endRow, int startColumn, int endColumn) {
+ this.startRow = startRow;
+ this.endRow = endRow;
+ this.startColumn = startColumn;
+ this.endColumn = endColumn;
}
- public IntegerEntry get(Object key) {
- return this.entries.get(key);
+ public void readFields(DataInput in) throws IOException {
+ this.startRow = in.readInt();
+ this.endRow = in.readInt();
+ this.startColumn = in.readInt();
+ this.endColumn = in.readInt();
}
- public IntegerEntry remove(Object key) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(startRow);
+ out.writeInt(endRow);
+ out.writeInt(startColumn);
+ out.writeInt(endColumn);
}
- public boolean containsKey(Object key) {
- return entries.containsKey(key);
+ public int getStartRow() {
+ return startRow;
}
- public boolean containsValue(Object value) {
- throw new UnsupportedOperationException("Don't support containsValue!");
+ public void setStartRow(int startRow) {
+ this.startRow = startRow;
}
- public boolean isEmpty() {
- return entries.isEmpty();
+ public int getEndRow() {
+ return endRow;
}
- public void clear() {
- throw new UnsupportedOperationException("VectorDatum is read-only!");
+ public void setEndRow(int endRow) {
+ this.endRow = endRow;
}
- public Set<Text> keySet() {
- Set<Text> result = new TreeSet<Text>();
- for (Text w : entries.keySet()) {
- result.add(w);
- }
- return result;
+ public int getStartColumn() {
+ return startColumn;
}
- public Set<Map.Entry<Text, IntegerEntry>> entrySet() {
- return Collections.unmodifiableSet(this.entries.entrySet());
+ public void setStartColumn(int startColumn) {
+ this.startColumn = startColumn;
}
- public Collection<IntegerEntry> values() {
- ArrayList<IntegerEntry> result = new ArrayList<IntegerEntry>();
- for (Writable w : entries.values()) {
- result.add((IntegerEntry) w);
- }
- return result;
+ public int getEndColumn() {
+ return endColumn;
}
- public void readFields(final DataInput in) throws IOException {
- this.row = new BlockID(Bytes.readByteArray(in));
- this.entries.readFields(in);
+ public void setEndColumn(int endColumn) {
+ this.endColumn = endColumn;
}
- public void write(final DataOutput out) throws IOException {
- Bytes.writeByteArray(out, this.row.getBytes());
- this.entries.write(out);
- }
-
- public void putAll(Map<? extends Text, ? extends IntegerEntry> m) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- /**
- *
- * The inner class for an entry of row.
- *
- */
- public static class Entries implements Map.Entry<byte[], IntegerEntry> {
-
- private final byte[] column;
- private final IntegerEntry entry;
-
- Entries(byte[] column, IntegerEntry entry) {
- this.column = column;
- this.entry = entry;
- }
-
- public IntegerEntry setValue(IntegerEntry c) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
- }
-
- public byte[] getKey() {
- byte[] key = column;
- return key;
- }
-
- public IntegerEntry getValue() {
- return entry;
- }
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(this);
+ oos.flush();
+ oos.close();
+ bos.close();
+ byte[] data = bos.toByteArray();
+ return data;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Mon Dec 15 19:21:47 2008
@@ -22,134 +22,35 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hama.SubMatrix;
-import org.apache.hama.util.BytesUtil;
-public class BlockWritable implements Writable, Map<Integer, BlockEntry> {
-
- public Integer row;
- public BlockMapWritable<Integer, BlockEntry> entries;
+public class BlockWritable implements Writable {
+ public SubMatrix matrix;
public BlockWritable() {
- this(new BlockMapWritable<Integer, BlockEntry>());
- }
-
- public BlockWritable(BlockMapWritable<Integer, BlockEntry> entries) {
- this.entries = entries;
- }
-
- public BlockWritable(int i, int j, SubMatrix mult) throws IOException {
- this.row = i;
- BlockMapWritable<Integer, BlockEntry> tr = new BlockMapWritable<Integer, BlockEntry>();
- tr.put(j, new BlockEntry(mult));
- this.entries = tr;
- }
-
- public int size() {
- return this.entries.size();
- }
-
- public SubMatrix get(int key) throws IOException {
- return this.entries.get(key).getValue();
- }
-
- public BlockEntry put(Integer key, BlockEntry value) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
- }
-
- public BlockEntry get(Object key) {
- return this.entries.get(key);
- }
-
- public BlockEntry remove(Object key) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
- }
-
- public boolean containsKey(Object key) {
- return entries.containsKey(key);
+ this.matrix = new SubMatrix(0, 0);
}
- public boolean containsValue(Object value) {
- throw new UnsupportedOperationException("Don't support containsValue!");
+ public BlockWritable(SubMatrix c) {
+ this.matrix = c;
}
- public boolean isEmpty() {
- return entries.isEmpty();
+ public BlockWritable(byte[] bytes) throws IOException {
+ this.matrix = new SubMatrix(bytes);
}
- public void clear() {
- throw new UnsupportedOperationException("VectorDatum is read-only!");
+ public void readFields(DataInput in) throws IOException {
+ this.matrix = new SubMatrix(Bytes.readByteArray(in));
}
- public Set<Integer> keySet() {
- Set<Integer> result = new TreeSet<Integer>();
- for (Integer w : entries.keySet()) {
- result.add(w);
- }
- return result;
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.matrix.getBytes());
}
- public Set<Map.Entry<Integer, BlockEntry>> entrySet() {
- return Collections.unmodifiableSet(this.entries.entrySet());
- }
-
- public Collection<BlockEntry> values() {
- ArrayList<BlockEntry> result = new ArrayList<BlockEntry>();
- for (Writable w : entries.values()) {
- result.add((BlockEntry) w);
- }
- return result;
- }
-
- public void readFields(final DataInput in) throws IOException {
- this.row = BytesUtil.bytesToInt(Bytes.readByteArray(in));
- this.entries.readFields(in);
- }
-
- public void write(final DataOutput out) throws IOException {
- Bytes.writeByteArray(out, BytesUtil.intToBytes(this.row));
- this.entries.write(out);
- }
-
- public void putAll(Map<? extends Integer, ? extends BlockEntry> m) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- /**
- *
- * The inner class for an entry of row.
- *
- */
- public static class Entries implements Map.Entry<byte[], BlockEntry> {
-
- private final byte[] column;
- private final BlockEntry entry;
-
- Entries(byte[] column, BlockEntry entry) {
- this.column = column;
- this.entry = entry;
- }
-
- public BlockEntry setValue(BlockEntry c) {
- throw new UnsupportedOperationException("VectorWritable is read-only!");
- }
-
- public byte[] getKey() {
- byte[] key = column;
- return key;
- }
-
- public BlockEntry getValue() {
- return entry;
- }
+ public SubMatrix get() {
+ return this.matrix;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Mon Dec 15 19:21:47 2008
@@ -32,11 +32,12 @@
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
+import org.apache.hama.io.BlockWritable;
public class BlockInputFormat extends TableInputFormatBase implements
- InputFormat<BlockID, BlockPosition>, JobConfigurable {
+ InputFormat<BlockID, BlockWritable>, JobConfigurable {
static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
private TableRecordReader tableRecordReader;
@@ -44,7 +45,7 @@
* Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
*/
protected static class TableRecordReader extends TableRecordReaderBase
- implements RecordReader<BlockID, BlockPosition> {
+ implements RecordReader<BlockID, BlockWritable> {
/**
* @return IntWritable
@@ -60,8 +61,8 @@
*
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
- public BlockPosition createValue() {
- return new BlockPosition();
+ public BlockWritable createValue() {
+ return new BlockWritable();
}
/**
@@ -73,7 +74,7 @@
* @return true if there was more data
* @throws IOException
*/
- public boolean next(BlockID key, BlockPosition value)
+ public boolean next(BlockID key, BlockWritable value)
throws IOException {
RowResult result = this.scanner.next();
boolean hasMore = result != null && result.size() > 0;
@@ -81,7 +82,8 @@
byte[] row = result.getRow();
BlockID bID = new BlockID(row);
key.set(bID.getRow(), bID.getColumn());
- Writables.copyWritable(result, value);
+ byte[] rs = result.get(Constants.BLOCK).getValue();
+ Writables.copyWritable(new BlockWritable(rs), value);
}
return hasMore;
}
@@ -94,7 +96,7 @@
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*/
- public RecordReader<BlockID, BlockPosition> getRecordReader(
+ public RecordReader<BlockID, BlockWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
TableSplit tSplit = (TableSplit) split;
TableRecordReader trr = this.tableRecordReader;
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Mon Dec 15 19:21:47 2008
@@ -20,11 +20,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hama.Constants;
@@ -52,7 +59,7 @@
job.setMapperClass(BlockingMapper.class);
FileInputFormat.addInputPaths(job, matrixPath);
- job.setInputFormat(BlockInputFormat.class);
+ job.setInputFormat(MyInputFormat.class);
job.setMapOutputKeyClass(BlockID.class);
job.setMapOutputValueClass(VectorWritable.class);
job.setOutputFormat(NullOutputFormat.class);
@@ -66,7 +73,7 @@
*/
public static abstract class BlockingMapRedBase extends MapReduceBase {
protected DenseMatrix matrix;
-
+
@Override
public void configure(JobConf job) {
try {
@@ -88,13 +95,101 @@
public void map(BlockID key, BlockPosition value,
OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
throws IOException {
- int startRow = value.getIndex(Constants.BLOCK_STARTROW);
- int endRow = value.getIndex(Constants.BLOCK_ENDROW);
- int startColumn = value.getIndex(Constants.BLOCK_STARTCOLUMN);
- int endColumn = value.getIndex(Constants.BLOCK_ENDCOLUMN);
+ int startRow = value.getStartRow();
+ int endRow = value.getEndRow();
+ int startColumn = value.getStartColumn();
+ int endColumn = value.getEndColumn();
+
+ matrix.setBlock(key.getRow(), key.getColumn(), matrix.subMatrix(startRow,
+ endRow, startColumn, endColumn));
+ }
+ }
+
+ static class MyInputFormat extends TableInputFormatBase implements
+ InputFormat<BlockID, BlockPosition>, JobConfigurable {
+ static final Log LOG = LogFactory.getLog(MyInputFormat.class);
+ private TableRecordReader tableRecordReader;
+
+ /**
+ * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
+ */
+ protected static class TableRecordReader extends TableRecordReaderBase
+ implements RecordReader<BlockID, BlockPosition> {
+
+ /**
+ * @return IntWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public BlockID createKey() {
+ return new BlockID();
+ }
+
+ /**
+ * @return BlockWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public BlockPosition createValue() {
+ return new BlockPosition();
+ }
+
+ /**
+ * @param key BlockID as input key.
+ * @param value BlockWritable as input value
+ *
+ * Converts Scanner.next() to BlockID, BlockWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(BlockID key, BlockPosition value) throws IOException {
+ RowResult result = this.scanner.next();
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ BlockID bID = new BlockID(row);
+ key.set(bID.getRow(), bID.getColumn());
+ Writables.copyWritable(
+ new BlockPosition(result.get(Constants.BLOCK_POSITION).getValue()),
+ value);
+ }
+ return hasMore;
+ }
+ }
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+ * the default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<BlockID, BlockPosition> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
- matrix.setBlock(key.getRow(), key.getColumn(),
- matrix.subMatrix(startRow, endRow, startColumn, endColumn));
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader to provide other {@link TableRecordReader}
+ * implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
}
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Dec 15 19:21:47 2008
@@ -28,6 +28,7 @@
import junit.framework.TestSuite;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hama.io.BlockPosition;
import org.apache.hama.io.DoubleEntry;
import org.apache.log4j.Logger;
@@ -78,20 +79,22 @@
public void testEntryAdd() throws IOException {
double origin = m1.get(1, 1);
m1.add(1, 1, 0.5);
-
+
assertEquals(m1.get(1, 1), origin + 0.5);
}
-
+
public void testBlocking() throws IOException, ClassNotFoundException {
assertEquals(((DenseMatrix) m1).isBlocked(), false);
((DenseMatrix) m1).blocking(4);
assertEquals(((DenseMatrix) m1).isBlocked(), true);
- int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
- double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();
+ BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
+ double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(),
+ pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
+ .getDoubleArray();
double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray();
assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
assertEquals(c.length, 5);
-
+
for (int i = 0; i < b.length; i++) {
for (int j = 0; j < b.length; j++) {
assertEquals(b[i][j], c[i][j]);
@@ -109,19 +112,21 @@
assertEquals(((DenseMatrix) m2).isBlocked(), false);
((DenseMatrix) m2).blocking_mapred(4);
assertEquals(((DenseMatrix) m2).isBlocked(), true);
- int[] pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
- double[][] b = ((DenseMatrix) m2).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubleArray();
+ BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
+ double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(),
+ pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
+ .getDoubleArray();
double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray();
assertEquals(((DenseMatrix) m2).getBlockSize(), 2);
assertEquals(c.length, 5);
-
+
for (int i = 0; i < b.length; i++) {
for (int j = 0; j < b.length; j++) {
assertEquals(b[i][j], c[i][j]);
}
}
}
-
+
/**
* Column vector test.
*
@@ -137,7 +142,7 @@
x++;
}
}
-
+
public void testGetSetAttribute() throws IOException {
m1.setRowLabel(0, "row1");
assertEquals(m1.getRowLabel(0), "row1");
@@ -228,11 +233,11 @@
public void testSetColumn() throws IOException {
Vector v = new DenseVector();
double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
+
for (int i = 0; i < SIZE; i++) {
v.set(i, entries[i]);
}
-
+
m1.setColumn(SIZE + 1, v);
Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
@@ -242,7 +247,7 @@
i++;
}
}
-
+
public void testLoadSave() throws IOException {
String path1 = m1.getPath();
// save m1 to aliase1
@@ -344,8 +349,8 @@
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(result.get(i, j)).substring(0, 14),
- String.valueOf(C[i][j]).substring(0, 14));
+ assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
+ .valueOf(C[i][j]).substring(0, 14));
}
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=726934&r1=726933&r2=726934&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Mon Dec 15 19:21:47 2008
@@ -21,7 +21,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hama.DenseMatrix;
@@ -29,6 +28,7 @@
import org.apache.hama.Matrix;
import org.apache.hama.algebra.BlockCyclicMultiplyMap;
import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
import org.apache.log4j.Logger;
@@ -36,19 +36,21 @@
static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
static Matrix c;
static final int SIZE = 20;
+
/** constructor */
public TestBlockMatrixMapReduce() {
super();
}
- public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
+ public void testBlockMatrixMapReduce() throws IOException,
+ ClassNotFoundException {
Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
((DenseMatrix) m1).blocking_mapred(4);
((DenseMatrix) m2).blocking_mapred(4);
miniMRJob(m1.getPath(), m2.getPath());
-
+
double[][] C = new double[SIZE][SIZE];
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
@@ -60,8 +62,8 @@
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(C[i][j]).substring(0, 5),
- String.valueOf(c.get(i, j)).substring(0, 5));
+ assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf(
+ c.get(i, j)).substring(0, 5));
}
}
}
@@ -69,13 +71,15 @@
private void miniMRJob(String string, String string2) throws IOException {
c = new DenseMatrix(conf);
String output = c.getPath();
-
+
JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
jobConf.setJobName("test MR job");
- BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
- BlockWritable.class, jobConf);
- BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
+ BlockCyclicMultiplyMap.initJob(string, string2,
+ BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
+ jobConf);
+ BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class,
+ jobConf);
jobConf.setNumMapTasks(2);
jobConf.setNumReduceTasks(2);