You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/10/28 09:49:07 UTC
svn commit: r830463 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/
src/java/org/apache/hama/mapreduce/ src/java/org/apache/hama/matrix/
src/java/org/apache/hama/matrix/algebra/ src/test/org/apache/hama/mapr...
Author: edwardyoon
Date: Wed Oct 28 08:49:06 2009
New Revision: 830463
URL: http://svn.apache.org/viewvc?rev=830463&view=rev
Log:
Replacement of Block Multiplication Map/Reduce
Added:
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java
incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 28 08:49:06 2009
@@ -42,6 +42,7 @@
IMPROVEMENTS
+ HAMA-217: Replacement of Block Multiplication Map/Reduce (edwardyoon)
HAMA-205: Replacement of NormMap/Reduce (edwardyoon)
HAMA-207: Replacement of Mat-Mat addition Map/Reduce (edwardyoon)
HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)
Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,63 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class CollectBlocksMapper extends TableMapper<BlockID, MapWritable>
+ implements Configurable {
+ private Configuration conf = null;
+ /** Parameter of the path of the matrix to be blocked * */
+ public static final String BLOCK_SIZE = "hama.blocking.size";
+ public static final String ROWS = "hama.blocking.rows";
+ public static final String COLUMNS = "hama.blocking.columns";
+ public static final String MATRIX_POS = "a.ore.b";
+
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ int startColumn, endColumn, blkRow = BytesUtil.getRowIndex(key.get())
+ / mBlockRowSize, i = 0;
+ DenseVector dv = new DenseVector(BytesUtil.getRowIndex(key.get()), value);
+
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if (endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ context.write(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries());
+
+ i++;
+ } while (endColumn < (mColumns - 1));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = Integer.parseInt(conf.get(BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(conf.get(ROWS, ""));
+ mColumns = Integer.parseInt(conf.get(COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,108 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SubMatrix;
+
+public class CollectBlocksReducer extends
+ TableReducer<BlockID, MapWritable, Writable> implements Configurable {
+ private Configuration conf = null;
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+ private boolean matrixPos;
+
+ public void reduce(BlockID key, Iterable<MapWritable> values,
+ Context context) throws IOException, InterruptedException {
+ // the block's base offset in the original matrix
+ int colBase = key.getColumn() * mBlockColSize;
+ int rowBase = key.getRow() * mBlockRowSize;
+
+
+ // the block's size : rows & columns
+ int smRows = mBlockRowSize;
+ if ((rowBase + mBlockRowSize - 1) >= mRows)
+ smRows = mRows - rowBase;
+ int smCols = mBlockColSize;
+ if ((colBase + mBlockColSize - 1) >= mColumns)
+ smCols = mColumns - colBase;
+
+ // construct the matrix
+ SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+ // i, j is the current offset in the sub-matrix
+ int i = 0, j = 0;
+ for (MapWritable value : values) {
+ DenseVector vw = new DenseVector(value);
+ // check the size is suitable
+ if (vw.size() != smCols)
+ throw new IOException("Block Column Size dismatched.");
+ i = vw.getRow() - rowBase;
+
+ if (i >= smRows || i < 0)
+ throw new IOException("Block Row Size dismatched.");
+
+ // put the subVector to the subMatrix
+ for (j = 0; j < smCols; j++) {
+ subMatrix.set(i, j, vw.get(colBase + j));
+ }
+ }
+ //BlockWritable outValue = new BlockWritable(subMatrix);
+
+ // It'll used for only matrix multiplication.
+ if (matrixPos) {
+ for (int x = 0; x < mBlockNum; x++) {
+ int r = (key.getRow() * mBlockNum) * mBlockNum;
+ int seq = (x * mBlockNum) + key.getColumn() + r;
+ BlockID bkID = new BlockID(key.getRow(), x, seq);
+ Put put = new Put(bkID.getBytes());
+ put.add(Bytes.toBytes(Constants.BLOCK_FAMILY),
+ Bytes.toBytes("a"),
+ subMatrix.getBytes());
+ context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+ }
+ } else {
+ for (int x = 0; x < mBlockNum; x++) {
+ int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+ + key.getRow();
+ BlockID bkID = new BlockID(x, key.getColumn(), seq);
+ Put put = new Put(bkID.getBytes());
+ put.add(Bytes.toBytes(Constants.BLOCK_FAMILY),
+ Bytes.toBytes("b"),
+ subMatrix.getBytes());
+ context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = Integer.parseInt(conf.get(CollectBlocksMapper.BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(conf.get(CollectBlocksMapper.ROWS, ""));
+ mColumns = Integer.parseInt(conf.get(CollectBlocksMapper.COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+
+ matrixPos = conf.getBoolean(CollectBlocksMapper.MATRIX_POS, true);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java Wed Oct 28 08:49:06 2009
@@ -24,8 +24,6 @@
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
@@ -33,7 +31,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Wed Oct 28 08:49:06 2009
@@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
@@ -56,17 +57,16 @@
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
import org.apache.hama.io.Pair;
import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.CollectBlocksMapper;
import org.apache.hama.mapred.DummyMapper;
import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.hama.mapreduce.CollectBlocksMapper;
import org.apache.hama.mapreduce.RandomMatrixMapper;
import org.apache.hama.mapreduce.RandomMatrixReducer;
-import org.apache.hama.matrix.algebra.BlockMultiplyMap;
-import org.apache.hama.matrix.algebra.BlockMultiplyReduce;
+import org.apache.hama.matrix.algebra.BlockMultMap;
+import org.apache.hama.matrix.algebra.BlockMultReduce;
import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
import org.apache.hama.matrix.algebra.JacobiEigenValue;
@@ -451,12 +451,11 @@
Scan scan = new Scan();
scan.addFamily(Constants.COLUMNFAMILY);
job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS, B.getPath());
- job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS, Double
- .toString(alpha));
-
+ job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS,
+ Double.toString(alpha));
+
TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixAdditionMap.class, IntWritable.class,
- MapWritable.class, job);
+ MatrixAdditionMap.class, IntWritable.class, MapWritable.class, job);
TableMapReduceUtil.initTableReducerJob(result.getPath(),
MatrixAdditionReduce.class, job);
try {
@@ -466,7 +465,7 @@
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
-
+
return result;
}
@@ -505,12 +504,13 @@
Scan scan = new Scan();
scan.addFamily(Constants.COLUMNFAMILY);
- job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS, summandList.toString());
- job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS, alphaList.toString());
-
+ job.getConfiguration().set(MatrixAdditionMap.MATRIX_SUMMANDS,
+ summandList.toString());
+ job.getConfiguration().set(MatrixAdditionMap.MATRIX_ALPHAS,
+ alphaList.toString());
+
TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixAdditionMap.class, IntWritable.class,
- MapWritable.class, job);
+ MatrixAdditionMap.class, IntWritable.class, MapWritable.class, job);
TableMapReduceUtil.initTableReducerJob(result.getPath(),
MatrixAdditionReduce.class, job);
try {
@@ -520,7 +520,7 @@
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
-
+
return result;
}
@@ -548,7 +548,7 @@
DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
List<Job> jobId = new ArrayList<Job>();
-
+
for (int i = 0; i < this.getRows(); i++) {
Job job = new Job(config, "multiplication MR job : " + result.getPath()
+ " " + i);
@@ -560,8 +560,8 @@
job.getConfiguration().setInt(DenseMatrixVectorMultMap.ITH_ROW, i);
TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
- DenseMatrixVectorMultMap.class, IntWritable.class,
- MapWritable.class, job);
+ DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
+ job);
TableMapReduceUtil.initTableReducerJob(result.getPath(),
DenseMatrixVectorMultReduce.class, job);
try {
@@ -582,7 +582,7 @@
e.printStackTrace();
}
}
-
+
return result;
}
@@ -599,7 +599,8 @@
String collectionTable = "collect_" + RandomVariable.randMatrixPath();
HTableDescriptor desc = new HTableDescriptor(collectionTable);
- desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+ desc
+ .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK_FAMILY)));
this.admin.createTable(desc);
LOG.info("Collect Blocks");
@@ -609,18 +610,24 @@
DenseMatrix result = new DenseMatrix(config, this.getRows(), this
.getColumns());
- JobConf jobConf = new JobConf(config);
- jobConf.setJobName("multiplication MR job : " + result.getPath());
+ Job job = new Job(config, "multiplication MR job : " + result.getPath());
+
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
- jobConf.setNumMapTasks(config.getNumMapTasks());
- jobConf.setNumReduceTasks(config.getNumReduceTasks());
+ TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
+ BlockMultMap.class, BlockID.class, BytesWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(result.getPath(),
+ BlockMultReduce.class, job);
- BlockMultiplyMap.initJob(collectionTable, BlockMultiplyMap.class,
- BlockID.class, BlockWritable.class, jobConf);
- BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
- jobConf);
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
- JobClient.runJob(jobConf);
hamaAdmin.delete(collectionTable);
return result;
}
@@ -687,8 +694,7 @@
Scan scan = new Scan();
for (int j = j0, jj = 0; j <= j1; j++, jj++) {
- scan.addColumn(Constants.COLUMNFAMILY, Bytes
- .toBytes(String.valueOf(j)));
+ scan.addColumn(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j)));
}
scan.setStartRow(BytesUtil.getRowIndex(i0));
scan.setStopRow(BytesUtil.getRowIndex(i1 + 1));
@@ -700,8 +706,8 @@
while (it.hasNext()) {
rs = it.next();
for (int j = j0, jj = 0; j <= j1; j++, jj++) {
- byte[] vv = rs.getValue(Constants.COLUMNFAMILY, Bytes
- .toBytes(String.valueOf(j)));
+ byte[] vv = rs.getValue(Constants.COLUMNFAMILY, Bytes.toBytes(String
+ .valueOf(j)));
System.out.println(BytesUtil.bytesToDouble(vv));
result.set(i, jj, vv);
}
@@ -727,21 +733,32 @@
throw new IOException("can't divide.");
int block_size = (int) blocks;
- JobConf jobConf = new JobConf(config);
- jobConf.setJobName("Blocking MR job" + getPath());
+ Job job = new Job(config, "Blocking MR job" + getPath());
- jobConf.setNumMapTasks(config.getNumMapTasks());
- jobConf.setNumReduceTasks(config.getNumReduceTasks());
- jobConf.setMapperClass(CollectBlocksMapper.class);
- jobConf.setInputFormat(VectorInputFormat.class);
- jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
- org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, path);
+ job.getConfiguration().set(CollectBlocksMapper.BLOCK_SIZE,
+ String.valueOf(block_size));
+ job.getConfiguration().set(CollectBlocksMapper.ROWS,
+ String.valueOf(this.getRows()));
+ job.getConfiguration().set(CollectBlocksMapper.COLUMNS,
+ String.valueOf(this.getColumns()));
+ job.getConfiguration().setBoolean(CollectBlocksMapper.MATRIX_POS, bool);
- CollectBlocksMapper.initJob(collectionTable, bool, block_size, this
- .getRows(), this.getColumns(), jobConf);
+ TableMapReduceUtil.initTableMapperJob(path, scan,
+ org.apache.hama.mapreduce.CollectBlocksMapper.class, BlockID.class,
+ MapWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(collectionTable,
+ org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
- JobClient.runJob(jobConf);
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
}
/**
@@ -914,7 +931,7 @@
e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
Bytes.toBytes(String.valueOf(i))));
-
+
get = new Get(BytesUtil.getRowIndex(pivot_col));
e2 = BytesUtil.bytesToDouble(table.get(get).getValue(
Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
@@ -943,15 +960,15 @@
Get get = null;
if (row + 2 < size) {
get = new Get(BytesUtil.getRowIndex(row));
-
+
double max = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+ Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
Bytes.toBytes(String.valueOf(m))));
double val;
for (int i = row + 2; i < size; i++) {
get = new Get(BytesUtil.getRowIndex(row));
val = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+ Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
Bytes.toBytes(String.valueOf(i))));
if (Math.abs(val) > Math.abs(max)) {
m = i;
@@ -968,26 +985,28 @@
int update(int row, double value, int state) throws IOException {
Get get = new Get(BytesUtil.getRowIndex(row));
double e = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
+ Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
Bytes.toBytes(JacobiEigenValue.EI_VAL)));
int changed = BytesUtil.bytesToInt(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
+ Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
Bytes.toBytes("changed")));
double y = e;
e += value;
VectorUpdate vu = new VectorUpdate(row);
vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EI_VAL, e);
-
+
if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
changed = 0;
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
-
+ vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
+ JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+
state--;
} else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
changed = 1;
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
-
+ vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
+ JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+
state++;
}
table.put(vu.getPut());
@@ -1011,7 +1030,7 @@
for (int j = 0; j < E[i].length; j++) {
get = new Get(BytesUtil.getRowIndex(i));
ev = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+ Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
Bytes.toBytes(String.valueOf(j))));
success &= ((Math.abs(ev - E[i][j]) < .0000001));
if (!success)
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java?rev=830463&r1=830462&r2=830463&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java Wed Oct 28 08:49:06 2009
@@ -50,6 +50,11 @@
this.initMap(row);
}
+ public DenseVector(int row, Result m) {
+ this.initMap(m);
+ this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+ }
+
public DenseVector(int row, MapWritable m) {
this.entries = m;
this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,25 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.SubMatrix;
+
+public class BlockMultMap extends TableMapper<BlockID, BytesWritable> {
+ private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK_FAMILY);
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ SubMatrix a = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("a")));
+ SubMatrix b = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("b")));
+
+ SubMatrix c = a.mult(b);
+ context.write(new BlockID(key.get()), new BytesWritable(c.getBytes()));
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,43 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.matrix.SubMatrix;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockMultReduce extends
+ TableReducer<BlockID, BytesWritable, Writable> {
+
+ @Override
+ public void reduce(BlockID key, Iterable<BytesWritable> values,
+ Context context) throws IOException, InterruptedException {
+ SubMatrix s = null;
+ for (BytesWritable value : values) {
+ SubMatrix b = new SubMatrix(value.getBytes());
+ if (s == null) {
+ s = b;
+ } else {
+ s = s.add(b);
+ }
+ }
+
+ int startRow = key.getRow() * s.getRows();
+ int startColumn = key.getColumn() * s.getColumns();
+
+ for (int i = 0; i < s.getRows(); i++) {
+ VectorUpdate update = new VectorUpdate(i + startRow);
+ for (int j = 0; j < s.getColumns(); j++) {
+ update.put(j + startColumn, s.get(i, j));
+ }
+
+ context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key
+ .getRow())), update.getPut());
+ }
+ }
+}
Added: incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java?rev=830463&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java Wed Oct 28 08:49:06 2009
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaCluster;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.log4j.Logger;
+
+public class TestBlockMatrixMapReduce extends HamaCluster {
+ static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+ static final int SIZE = 32;
+
+ /** constructor */
+ public TestBlockMatrixMapReduce() {
+ super();
+ }
+
+ public void testBlockMatrixMapReduce() throws IOException,
+ ClassNotFoundException {
+ DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+ DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+
+ DenseMatrix c = (DenseMatrix) m1.mult(m2, 16);
+
+ double[][] mem = new double[SIZE][SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ mem[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ double gap = (mem[i][j] - c.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
+ }
+ }
+ }
+}