You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/11/17 02:57:47 UTC
svn commit: r718158 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/
src/java/org/apache/hama/mapred/ src/java/org/apache/hama/util/ src/test/...
Author: edwardyoon
Date: Sun Nov 16 17:57:47 2008
New Revision: 718158
URL: http://svn.apache.org/viewvc?rev=718158&view=rev
Log:
2D block multiplication
Added:
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java
incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Nov 16 17:57:47 2008
@@ -4,9 +4,10 @@
NEW FEATURES
+ HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon)
HAMA-104: Add getNumMap/reduceTasks to HamaConfiguration (edwardyoon)
HAMA-92: Add subMatrix() to Matrix (edwardyoon)
- HAMA-83: Add a writable comparable for BlockID (edwardyoon)
+ HAMA-84: Add a writable comparable for BlockID (edwardyoon)
HAMA-81: Add subVector(int i0, int i1) to Vector (edwardyoon)
Hama-80: Add identity(int m, int n)
which returns identity matrix (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Sun Nov 16 17:57:47 2008
@@ -40,9 +40,12 @@
parseArgs(args);
}
- Matrix a = DenseMatrix.random(conf, row, column);
- Matrix b = DenseMatrix.random(conf, row, column);
+ DenseMatrix a = DenseMatrix.random(conf, row, column);
+ DenseMatrix b = DenseMatrix.random(conf, row, column);
+ a.blocking(conf.getNumMapTasks());
+ b.blocking(conf.getNumMapTasks());
+
Matrix c = a.mult(b);
a.close();
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Sun Nov 16 17:57:47 2008
@@ -73,7 +73,8 @@
this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
-
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK));
+
LOG.info("Initializing the matrix storage.");
this.admin.createTable(this.tableDesc);
LOG.info("Create Matrix " + matrixPath);
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java Sun Nov 16 17:57:47 2008
@@ -22,10 +22,10 @@
import java.util.Iterator;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
public abstract class AbstractVector {
- public VectorMapWritable<Integer, DoubleEntry> entries;
+ public MapWritable<Integer, DoubleEntry> entries;
public double get(int index) throws NullPointerException {
return this.entries.get(index).getValue();
@@ -34,7 +34,7 @@
public void set(int index, double value) {
// If entries are null, create new object
if(this.entries == null) {
- this.entries = new VectorMapWritable<Integer, DoubleEntry>();
+ this.entries = new MapWritable<Integer, DoubleEntry>();
}
this.entries.put(index, new DoubleEntry(value));
@@ -62,7 +62,7 @@
return (this.entries != null) ? this.entries.size() : 0;
}
- public VectorMapWritable<Integer, DoubleEntry> getEntries() {
+ public MapWritable<Integer, DoubleEntry> getEntries() {
return this.entries;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Sun Nov 16 17:57:47 2008
@@ -86,4 +86,22 @@
/** default try times to generate a suitable tablename */
public static final int DEFAULT_TRY_TIMES = 10000000;
+
+ /** start row of block */
+ public static final String BLOCK_STARTROW = "attribute:startRow";
+
+ /** end row of block */
+ public static final String BLOCK_ENDROW = "attribute:endRow";
+
+ /** start column of block */
+ public static final String BLOCK_STARTCOLUMN = "attribute:startColumn";
+
+ /** end column of block */
+ public static final String BLOCK_ENDCOLUMN = "attribute:endColumn";
+
+ /** block dimension */
+ public static final String BLOCK = "block:";
+
+ /** block size */
+ public static final String BLOCK_SIZE = "attribute:blockSize";
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Sun Nov 16 17:57:47 2008
@@ -20,49 +20,52 @@
package org.apache.hama;
import java.io.IOException;
-import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
import org.apache.hama.algebra.RowCyclicAdditionMap;
import org.apache.hama.algebra.RowCyclicAdditionReduce;
import org.apache.hama.algebra.SIMDMultiplyMap;
import org.apache.hama.algebra.SIMDMultiplyReduce;
+import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.BlockCyclicReduce;
import org.apache.hama.mapred.RowCyclicReduce;
-import org.apache.hama.util.JobManager;
import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
import org.apache.hama.util.RandomVariable;
public class DenseMatrix extends AbstractMatrix implements Matrix {
static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_";
-
+
/**
- * Construct a raw matrix.
- * Just create a table in HBase, but didn't lay any schema ( such as
- * dimensions: i, j ) on it.
+ * Construct a raw matrix. Just create a table in HBase, but didn't lay any
+ * schema ( such as dimensions: i, j ) on it.
*
* @param conf configuration object
- * @throws IOException
- * throw the exception to let the user know what happend, if we
- * didn't create the matrix successfully.
+ * @throws IOException throw the exception to let the user know what happend,
+ * if we didn't create the matrix successfully.
*/
public DenseMatrix(HamaConfiguration conf) throws IOException {
setConfiguration(conf);
-
+
tryToCreateTable();
-
+
closed = false;
}
@@ -71,28 +74,27 @@
*
* @param conf configuration object
* @param matrixName the name of the matrix
- * @param force if force is true, a new matrix will be created
- * no matter 'matrixName' has aliased to an existed matrix;
- * otherwise, just try to load an existed matrix alised
- * 'matrixName'.
- * @throws IOException
+ * @param force if force is true, a new matrix will be created no matter
+ * 'matrixName' has aliased to an existed matrix; otherwise,
+ * just try to load an existed matrix alised 'matrixName'.
+ * @throws IOException
*/
- public DenseMatrix(HamaConfiguration conf, String matrixName,
- boolean force) throws IOException {
+ public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
+ throws IOException {
setConfiguration(conf);
// if force is set to true:
- // 1) if this matrixName has aliase to other matrix, we will remove
- // the old aliase, create a new matrix table, and aliase to it.
+ // 1) if this matrixName has aliase to other matrix, we will remove
+ // the old aliase, create a new matrix table, and aliase to it.
// 2) if this matrixName has no aliase to other matrix, we will create
- // a new matrix table, and alise to it.
+ // a new matrix table, and alise to it.
//
// if force is set to false, we just try to load an existed matrix alised
// as 'matrixname'.
-
+
boolean existed = hamaAdmin.matrixExists(matrixName);
-
+
if (force) {
- if(existed) {
+ if (existed) {
// remove the old aliase
hamaAdmin.delete(matrixName);
}
@@ -101,7 +103,7 @@
// save the new aliase relationship
save(matrixName);
} else {
- if(existed) {
+ if (existed) {
// try to get the actual path of the table
matrixPath = hamaAdmin.getPath(matrixName);
// load the matrix
@@ -109,33 +111,34 @@
// increment the reference
incrementAndGetRef();
} else {
- throw new IOException("Try to load non-existed matrix alised as " + matrixName);
+ throw new IOException("Try to load non-existed matrix alised as "
+ + matrixName);
}
}
closed = false;
}
-
+
/**
- * Load a matrix from an existed matrix table whose tablename is 'matrixpath'
- *
- * !! It is an internal used for map/reduce.
+ * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+ * It is an internal used for map/reduce.
*
* @param conf configuration object
- * @param matrixpath
- * @throws IOException
- * @throws IOException
+ * @param matrixpath
+ * @throws IOException
+ * @throws IOException
*/
- public DenseMatrix(HamaConfiguration conf, String matrixpath) throws IOException {
+ public DenseMatrix(HamaConfiguration conf, String matrixpath)
+ throws IOException {
setConfiguration(conf);
matrixPath = matrixpath;
// load the matrix
table = new HTable(conf, matrixPath);
// TODO: now we don't increment the reference of the table
- // for it's an internal use for map/reduce.
- // if we want to increment the reference of the table,
- // we don't know where to call Matrix.close in Add & Mul map/reduce
- // process to decrement the reference. It seems difficulty.
+ // for it's an internal use for map/reduce.
+ // if we want to increment the reference of the table,
+ // we don't know where to call Matrix.close in Add & Mul map/reduce
+ // process to decrement the reference. It seems difficulty.
}
/**
@@ -145,17 +148,17 @@
* @param m the number of rows.
* @param n the number of columns.
* @param s fill the matrix with this scalar value.
- * @throws IOException
- * throw the exception to let the user know what happend, if we
- * didn't create the matrix successfully.
+ * @throws IOException throw the exception to let the user know what happend,
+ * if we didn't create the matrix successfully.
*/
- public DenseMatrix(HamaConfiguration conf, int m, int n, double s) throws IOException {
+ public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
+ throws IOException {
setConfiguration(conf);
-
+
tryToCreateTable();
closed = false;
-
+
for (int i = 0; i < m; i++) {
for (int j = 0; j < n; j++) {
set(i, j, s);
@@ -164,29 +167,31 @@
setDimension(m, n);
}
-
- /** try to create a new matrix with a new random name.
- * try times will be (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+
+ /**
+ * try to create a new matrix with a new random name. try times will be
+ * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+ *
* @throws IOException
*/
private void tryToCreateTable() throws IOException {
int tryTimes = Constants.DEFAULT_TRY_TIMES;
do {
matrixPath = TABLE_PREFIX + RandomVariable.randMatrixPath(tryPathLength);
-
+
if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
tableDesc = new HTableDescriptor(matrixPath);
create();
return;
}
-
+
tryTimes--;
- if(tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
+ if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
tryPathLength++;
tryTimes = Constants.DEFAULT_TRY_TIMES;
}
-
- } while(tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
+
+ } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
// exhaustes the try times.
// throw out an IOException to let the user know what happened.
throw new IOException("Try too many times to create a table in hbase.");
@@ -201,11 +206,12 @@
* @return an m-by-n matrix with uniformly distributed random elements.
* @throws IOException
*/
- public static Matrix random(HamaConfiguration conf, int m, int n)
+ public static DenseMatrix random(HamaConfiguration conf, int m, int n)
throws IOException {
- Matrix rand = new DenseMatrix(conf);
+ DenseMatrix rand = new DenseMatrix(conf);
DenseVector vector = new DenseVector();
- LOG.info("Create the " + m + " * " + n + " random matrix : " + rand.getPath());
+ LOG.info("Create the " + m + " * " + n + " random matrix : "
+ + rand.getPath());
for (int i = 0; i < m; i++) {
vector.clear();
@@ -231,8 +237,9 @@
public static Matrix identity(HamaConfiguration conf, int m, int n)
throws IOException {
Matrix identity = new DenseMatrix(conf);
- LOG.info("Create the " + m + " * " + n + " identity matrix : " + identity.getPath());
-
+ LOG.info("Create the " + m + " * " + n + " identity matrix : "
+ + identity.getPath());
+
for (int i = 0; i < m; i++) {
DenseVector vector = new DenseVector();
for (int j = 0; j < n; j++) {
@@ -254,9 +261,11 @@
jobConf.setNumMapTasks(config.getNumMapTasks());
jobConf.setNumReduceTasks(config.getNumReduceTasks());
- RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), RowCyclicAdditionMap.class,
- IntWritable.class, VectorWritable.class, jobConf);
- RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class, jobConf);
+ RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(),
+ RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class,
+ jobConf);
+ RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class,
+ jobConf);
JobManager.execute(jobConf, result);
return result;
@@ -276,11 +285,11 @@
byte[][] c = { columnKey };
Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
- VectorMapWritable<Integer, DoubleEntry> trunk = new VectorMapWritable<Integer, DoubleEntry>();
+ MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
for (RowResult row : scan) {
- trunk.put(BytesUtil.bytesToInt(row.getRow()),
- new DoubleEntry(row.get(columnKey)));
+ trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row
+ .get(columnKey)));
}
return new DenseVector(trunk);
@@ -295,9 +304,20 @@
jobConf.setNumMapTasks(config.getNumMapTasks());
jobConf.setNumReduceTasks(config.getNumReduceTasks());
- SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class,
- IntWritable.class, VectorWritable.class, jobConf);
- RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf);
+ if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
+ BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
+ BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
+ jobConf);
+ BlockCyclicReduce.initJob(result.getPath(),
+ BlockCyclicMultiplyReduce.class, jobConf);
+ } else {
+ SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
+ SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
+ jobConf);
+ RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
+ jobConf);
+ }
+
JobManager.execute(jobConf, result);
return result;
}
@@ -336,26 +356,123 @@
return this.getClass().getSimpleName();
}
+ // =========================================
+
public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
int columnSize = (j1 - j0) + 1;
- SubMatrix result = new SubMatrix((i1-i0) + 1, columnSize);
- byte[][] c = new byte[columnSize][];
- for (int i = 0; i < columnSize; i++) {
- c[i] = BytesUtil.getColumnIndex(j0 + i);
+ SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
+
+ for (int i = i0, ii = 0; i <= i1; i++, ii++) {
+ for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+ Cell c = table
+ .get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
+ result.set(ii, jj, BytesUtil.bytesToDouble(c.getValue()));
+ }
}
- Scanner scan = table.getScanner(c, BytesUtil.intToBytes(i0), BytesUtil.intToBytes(i1 + 1));
+ return result;
+ }
+
+ /**
+ * The type of the Matrix to be blocked, must be dense.
+ *
+ * TODO: it should be work on map/reduce
+ */
+ public void blocking(int blockNum) throws IOException {
+ setBlockPosition(blockNum);
+ setBlockSize(blockNum);
+
+ String[] columns = new String[] { Constants.BLOCK_STARTROW,
+ Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
+ Constants.BLOCK_ENDCOLUMN };
+ Scanner scan = table.getScanner(columns);
- int rKey = 0, cKey = 0;
for (RowResult row : scan) {
- for (Map.Entry<byte[], Cell> e : row.entrySet()) {
- result.set(rKey, cKey, BytesUtil.bytesToDouble(e.getValue().getValue()));
- cKey++;
+ String[] key = Bytes.toString(row.getRow()).split("[,]");
+ int blockR = Integer.parseInt(key[0]);
+ int blockC = Integer.parseInt(key[1]);
+ setBlock(blockR, blockC, blockMatrix(blockR, blockC));
+ }
+ }
+
+ public boolean isBlocked() throws IOException {
+ return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
+ : true;
+ }
+
+ public SubMatrix getBlock(int i, int j) throws IOException {
+ return BytesUtil.bytesToSubMatrix(table.get(String.valueOf(i),
+ Constants.BLOCK + j).getValue());
+ }
+
+ /**
+ * @return the size of block
+ * @throws IOException
+ */
+ public int getBlockSize() throws IOException {
+ return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
+ Constants.BLOCK_SIZE).getValue()) : -1;
+ }
+
+ protected void setBlockSize(int blockNum) throws IOException {
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
+ table.commit(update);
+ }
+
+ protected void setBlock(int i, int j, SubMatrix matrix) throws IOException {
+ BatchUpdate update = new BatchUpdate(String.valueOf(i));
+ update.put(Constants.BLOCK + j, BytesUtil.subMatrixToBytes(matrix));
+ table.commit(update);
+ }
+
+ protected void setBlockPosition(int blockNum) throws IOException {
+ int block_row_size = this.getRows() / blockNum;
+ int block_column_size = this.getColumns() / blockNum;
+
+ for (int i = 0; i < blockNum; i++) {
+ for (int j = 0; j < blockNum; j++) {
+ int startRow = i * block_row_size;
+ int endRow = (startRow + block_row_size) - 1;
+ int startColumn = j * block_column_size;
+ int endColumn = (startColumn + block_column_size) - 1;
+
+ BatchUpdate update = new BatchUpdate(getBlockKey(i, j));
+ update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
+ update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
+ update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
+ .intToBytes(startColumn));
+ update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
+ table.commit(update);
}
- rKey++;
- cKey = 0;
}
+ }
+ protected int[] getBlockPosition(int i, int j) throws IOException {
+ int[] result = new int[4];
+ result[0] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+ Constants.BLOCK_STARTROW).getValue());
+ result[1] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+ Constants.BLOCK_ENDROW).getValue());
+ result[2] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+ Constants.BLOCK_STARTCOLUMN).getValue());
+ result[3] = BytesUtil.bytesToInt(table.get(getBlockKey(i, j),
+ Constants.BLOCK_ENDCOLUMN).getValue());
return result;
}
+
+ protected String getBlockKey(int i, int j) {
+ return i + "," + j;
+ }
+
+ /**
+ * @param i
+ * @param j
+ * @return the sub matrix
+ * @throws IOException
+ */
+ protected SubMatrix blockMatrix(int i, int j) throws IOException {
+ int[] pos = getBlockPosition(i, j);
+ return subMatrix(pos[0], pos[1], pos[2], pos[3]);
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java Sun Nov 16 17:57:47 2008
@@ -26,7 +26,7 @@
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.VectorMapWritable;
+import org.apache.hama.io.MapWritable;
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
@@ -34,15 +34,15 @@
static final Logger LOG = Logger.getLogger(DenseVector.class);
public DenseVector() {
- this(new VectorMapWritable<Integer, DoubleEntry>());
+ this(new MapWritable<Integer, DoubleEntry>());
}
- public DenseVector(VectorMapWritable<Integer, DoubleEntry> m) {
+ public DenseVector(MapWritable<Integer, DoubleEntry> m) {
this.entries = m;
}
public DenseVector(RowResult row) {
- this.entries = new VectorMapWritable<Integer, DoubleEntry>();
+ this.entries = new MapWritable<Integer, DoubleEntry>();
for (Map.Entry<byte[], Cell> f : row.entrySet()) {
this.entries.put(BytesUtil.getColumnIndex(f.getKey()),
new DoubleEntry(f.getValue()));
Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Sun Nov 16 17:57:47 2008
@@ -25,7 +25,8 @@
* A sub matrix is a matrix formed by selecting certain rows and columns from a
* bigger matrix. This is a in-memory operation only.
*/
-public class SubMatrix {
+public class SubMatrix implements java.io.Serializable {
+ private static final long serialVersionUID = 1L;
static final Logger LOG = Logger.getLogger(SubMatrix.class);
private double[][] matrix;
@@ -72,6 +73,23 @@
}
/**
+ * c = a+b
+ *
+ * @param b
+ * @return c
+ */
+ public SubMatrix add(SubMatrix b) {
+ double[][] C = new double[size()][size()];
+ for (int i = 0; i < size(); i++) {
+ for (int j = 0; j < size(); j++) {
+ C[i][j] += this.get(i, j) + b.get(i, j);
+ }
+ }
+
+ return new SubMatrix(C);
+ }
+
+ /**
* c = a*b
*
* @param b
@@ -90,6 +108,11 @@
return new SubMatrix(C);
}
+ /**
+ * TODO: SubMatrix should be able to get row, column size
+ *
+ * @return the length
+ */
public int size() {
return matrix.length;
}
@@ -97,4 +120,9 @@
public void close() {
matrix = null;
}
+
+ public double[][] getDoubles() {
+ double[][] result = matrix;
+ return result;
+ }
}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,57 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.Matrix;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockCyclicMap;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyMap extends
+ BlockCyclicMap<IntWritable, BlockWritable> {
+ static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
+ protected Matrix matrix_b;
+ public static final String MATRIX_B = "hama.multiplication.matrix.b";
+
+ public void configure(JobConf job) {
+ try {
+ matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+ } catch (IOException e) {
+ LOG.warn("Load matrix_b failed : " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void initJob(String matrix_a, String matrix_b,
+ Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
+ Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+ jobConf.set(MATRIX_B, matrix_b);
+
+ initJob(matrix_a, map, jobConf);
+ }
+
+ @Override
+ public void map(IntWritable key, BlockWritable value,
+ OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
+ throws IOException {
+ for (int i = 0; i < value.size(); i++) {
+ SubMatrix a = value.get(i);
+ for (int j = 0; j < ((DenseMatrix) matrix_b).getBlockSize(); j++) {
+ SubMatrix b = ((DenseMatrix) matrix_b).getBlock(i, j);
+ SubMatrix c = a.mult(b);
+ output.collect(key, new BlockWritable(key.get(), j, c));
+ }
+ }
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,59 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockEntry;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.BlockCyclicReduce;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyReduce extends
+ BlockCyclicReduce<IntWritable, BlockWritable> {
+ static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
+
+ @Override
+ public void reduce(IntWritable key, Iterator<BlockWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+ int row = key.get();
+ Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
+
+ while (values.hasNext()) {
+ BlockWritable b = values.next();
+ for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
+ int j = e.getKey();
+ SubMatrix value = e.getValue().getValue();
+ if (sum.containsKey(j)) {
+ sum.put(j, sum.get(j).add(value));
+ } else {
+ sum.put(j, value);
+ }
+ }
+ }
+
+ for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
+ int column = e.getKey();
+ SubMatrix mat = e.getValue();
+
+ int startRow = row * mat.size();
+ int startColumn = column * mat.size();
+
+ // TODO: sub matrix can be not a regular sqaure
+ for (int i = 0; i < mat.size(); i++) {
+ VectorUpdate update = new VectorUpdate(i + startRow);
+ for (int j = 0; j < mat.size(); j++) {
+ update.put(j + startColumn, mat.get(i, j));
+ }
+ output.collect(key, update);
+ }
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockEntry.java Sun Nov 16 17:57:47 2008
@@ -60,11 +60,10 @@
}
/**
- * @return the current VectorEntry's value
+ * @return the current BlockEntry's value
* @throws IOException
- * @throws ClassNotFoundException
*/
- public SubMatrix getValue() throws IOException, ClassNotFoundException {
+ public SubMatrix getValue() throws IOException {
return BytesUtil.bytesToSubMatrix(this.values[0]);
}
Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockMapWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,170 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockMapWritable <K, V> implements Map<Integer, V>, Writable,
+ Configurable {
+ private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+ // Static maps of code to class and vice versa. Includes types used in hama
+ // only.
+ static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+ static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+ static {
+ byte code = 0;
+ addToMap(HStoreKey.class, code++);
+ addToMap(ImmutableBytesWritable.class, code++);
+ addToMap(Text.class, code++);
+ addToMap(BlockEntry.class, code++);
+ addToMap(byte[].class, code++);
+ }
+
+ @SuppressWarnings("boxing")
+ private static void addToMap(final Class<?> clazz, final byte code) {
+ CLASS_TO_CODE.put(clazz, code);
+ CODE_TO_CLASS.put(code, clazz);
+ }
+
+ private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+ /** @return the conf */
+ public Configuration getConf() {
+ return conf.get();
+ }
+
+ /** @param conf the conf to set */
+ public void setConf(Configuration conf) {
+ this.conf.set(conf);
+ }
+
+ /** {@inheritDoc} */
+ public void clear() {
+ instance.clear();
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsKey(Object key) {
+ return instance.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsValue(Object value) {
+ return instance.containsValue(value);
+ }
+
+ /** {@inheritDoc} */
+ public Set<Entry<Integer, V>> entrySet() {
+ return instance.entrySet();
+ }
+
+ /** {@inheritDoc} */
+ public V get(Object key) {
+ return instance.get(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean isEmpty() {
+ return instance.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ public Set<Integer> keySet() {
+ return instance.keySet();
+ }
+
+ /** {@inheritDoc} */
+ public int size() {
+ return instance.size();
+ }
+
+ /** {@inheritDoc} */
+ public Collection<V> values() {
+ return instance.values();
+ }
+
+ // Writable
+
+ /** @return the Class class for the specified id */
+ protected Class<?> getClass(byte id) {
+ return CODE_TO_CLASS.get(id);
+ }
+
+ /** @return the id for the specified Class */
+ protected byte getId(Class<?> clazz) {
+ Byte b = CLASS_TO_CODE.get(clazz);
+ if (b == null) {
+ throw new NullPointerException("Nothing for : " + clazz);
+ }
+ return b;
+ }
+
+ @Override
+ public String toString() {
+ return this.instance.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ // Write out the number of entries in the map
+ out.writeInt(this.instance.size());
+
+ // Then write out each key/value pair
+ for (Map.Entry<Integer, V> e : instance.entrySet()) {
+ Bytes.writeByteArray(out, BytesUtil.getBlockIndex(e.getKey()));
+ out.writeByte(getId(e.getValue().getClass()));
+ ((Writable) e.getValue()).write(out);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+ // First clear the map. Otherwise we will just accumulate
+ // entries every time this method is called.
+ this.instance.clear();
+
+ // Read the number of entries in the map
+ int entries = in.readInt();
+
+ // Then read each key/value pair
+ for (int i = 0; i < entries; i++) {
+ byte[] key = Bytes.readByteArray(in);
+ Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+ .readByte()), getConf());
+ value.readFields(in);
+ V v = (V) value;
+ this.instance.put(BytesUtil.getBlockIndex(key), v);
+ }
+ }
+
+ public void putAll(Map<? extends Integer, ? extends V> m) {
+ this.instance.putAll(m);
+ }
+
+ public V remove(Object key) {
+ return this.instance.remove(key);
+ }
+
+ public V put(Integer key, V value) {
+ return this.instance.put(key, value);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,136 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockWritable implements Writable, Map<Integer, BlockEntry> {
+
+ public Integer row;
+ public BlockMapWritable<Integer, BlockEntry> entries;
+
+ public BlockWritable() {
+ this(new BlockMapWritable<Integer, BlockEntry>());
+ }
+
+ public BlockWritable(BlockMapWritable<Integer, BlockEntry> entries) {
+ this.entries = entries;
+ }
+
+ public BlockWritable(int i, int j, SubMatrix mult) throws IOException {
+ this.row = i;
+ BlockMapWritable<Integer, BlockEntry> tr = new BlockMapWritable<Integer, BlockEntry>();
+ tr.put(j, new BlockEntry(mult));
+ this.entries = tr;
+ }
+
+ public int size() {
+ return this.entries.size();
+ }
+
+ public SubMatrix get(int key) throws IOException {
+ return this.entries.get(key).getValue();
+ }
+
+ public BlockEntry put(Integer key, BlockEntry value) {
+ throw new UnsupportedOperationException("VectorWritable is read-only!");
+ }
+
+ public BlockEntry get(Object key) {
+ return this.entries.get(key);
+ }
+
+ public BlockEntry remove(Object key) {
+ throw new UnsupportedOperationException("VectorWritable is read-only!");
+ }
+
+ public boolean containsKey(Object key) {
+ return entries.containsKey(key);
+ }
+
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException("Don't support containsValue!");
+ }
+
+ public boolean isEmpty() {
+ return entries.isEmpty();
+ }
+
+ public void clear() {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ public Set<Integer> keySet() {
+ Set<Integer> result = new TreeSet<Integer>();
+ for (Integer w : entries.keySet()) {
+ result.add(w);
+ }
+ return result;
+ }
+
+ public Set<Map.Entry<Integer, BlockEntry>> entrySet() {
+ return Collections.unmodifiableSet(this.entries.entrySet());
+ }
+
+ public Collection<BlockEntry> values() {
+ ArrayList<BlockEntry> result = new ArrayList<BlockEntry>();
+ for (Writable w : entries.values()) {
+ result.add((BlockEntry) w);
+ }
+ return result;
+ }
+
+ public void readFields(final DataInput in) throws IOException {
+ this.row = BytesUtil.bytesToInt(Bytes.readByteArray(in));
+ this.entries.readFields(in);
+ }
+
+ public void write(final DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, BytesUtil.intToBytes(this.row));
+ this.entries.write(out);
+ }
+
+ public void putAll(Map<? extends Integer, ? extends BlockEntry> m) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /**
+ *
+ * The inner class for an entry of row.
+ *
+ */
+ public static class Entries implements Map.Entry<byte[], BlockEntry> {
+
+ private final byte[] column;
+ private final BlockEntry entry;
+
+ Entries(byte[] column, BlockEntry entry) {
+ this.column = column;
+ this.entry = entry;
+ }
+
+ public BlockEntry setValue(BlockEntry c) {
+ throw new UnsupportedOperationException("VectorWritable is read-only!");
+ }
+
+ public byte[] getKey() {
+ byte[] key = column;
+ return key;
+ }
+
+ public BlockEntry getValue() {
+ return entry;
+ }
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,190 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class MapWritable<K, V> implements Map<Integer, V>, Writable,
+ Configurable {
+ private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+ // Static maps of code to class and vice versa. Includes types used in hama
+ // only.
+ static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+ static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+ static {
+ byte code = 0;
+ addToMap(HStoreKey.class, code++);
+ addToMap(ImmutableBytesWritable.class, code++);
+ addToMap(Text.class, code++);
+ addToMap(DoubleEntry.class, code++);
+ addToMap(BlockEntry.class, code++);
+ addToMap(byte[].class, code++);
+ }
+
+ @SuppressWarnings("boxing")
+ private static void addToMap(final Class<?> clazz, final byte code) {
+ CLASS_TO_CODE.put(clazz, code);
+ CODE_TO_CLASS.put(code, clazz);
+ }
+
+ private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+ /** @return the conf */
+ public Configuration getConf() {
+ return conf.get();
+ }
+
+ /** @param conf the conf to set */
+ public void setConf(Configuration conf) {
+ this.conf.set(conf);
+ }
+
+ /** {@inheritDoc} */
+ public void clear() {
+ instance.clear();
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsKey(Object key) {
+ return instance.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsValue(Object value) {
+ return instance.containsValue(value);
+ }
+
+ /** {@inheritDoc} */
+ public Set<Entry<Integer, V>> entrySet() {
+ return instance.entrySet();
+ }
+
+ /** {@inheritDoc} */
+ public V get(Object key) {
+ return instance.get(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean isEmpty() {
+ return instance.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ public Set<Integer> keySet() {
+ return instance.keySet();
+ }
+
+ /** {@inheritDoc} */
+ public int size() {
+ return instance.size();
+ }
+
+ /** {@inheritDoc} */
+ public Collection<V> values() {
+ return instance.values();
+ }
+
+ // Writable
+
+ /** @return the Class class for the specified id */
+ protected Class<?> getClass(byte id) {
+ return CODE_TO_CLASS.get(id);
+ }
+
+ /** @return the id for the specified Class */
+ protected byte getId(Class<?> clazz) {
+ Byte b = CLASS_TO_CODE.get(clazz);
+ if (b == null) {
+ throw new NullPointerException("Nothing for : " + clazz);
+ }
+ return b;
+ }
+
+ @Override
+ public String toString() {
+ return this.instance.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ // Write out the number of entries in the map
+ out.writeInt(this.instance.size());
+
+ // Then write out each key/value pair
+ for (Map.Entry<Integer, V> e : instance.entrySet()) {
+ Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey()));
+ out.writeByte(getId(e.getValue().getClass()));
+ ((Writable) e.getValue()).write(out);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+ // First clear the map. Otherwise we will just accumulate
+ // entries every time this method is called.
+ this.instance.clear();
+
+ // Read the number of entries in the map
+ int entries = in.readInt();
+
+ // Then read each key/value pair
+ for (int i = 0; i < entries; i++) {
+ byte[] key = Bytes.readByteArray(in);
+ Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+ .readByte()), getConf());
+ value.readFields(in);
+ V v = (V) value;
+ this.instance.put(BytesUtil.getColumnIndex(key), v);
+ }
+ }
+
+ public void putAll(Map<? extends Integer, ? extends V> m) {
+ this.instance.putAll(m);
+ }
+
+ public V remove(Object key) {
+ return this.instance.remove(key);
+ }
+
+ public V put(Integer key, V value) {
+ return this.instance.put(key, value);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java Sun Nov 16 17:57:47 2008
@@ -38,13 +38,13 @@
public class VectorWritable implements Writable, Map<Integer, DoubleEntry> {
public Integer row;
- public VectorMapWritable<Integer, DoubleEntry> entries;
+ public MapWritable<Integer, DoubleEntry> entries;
public VectorWritable() {
- this(new VectorMapWritable<Integer, DoubleEntry>());
+ this(new MapWritable<Integer, DoubleEntry>());
}
- public VectorWritable(VectorMapWritable<Integer, DoubleEntry> entries) {
+ public VectorWritable(MapWritable<Integer, DoubleEntry> entries) {
this.entries = entries;
}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,36 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
+import org.apache.hama.io.BlockWritable;
+import org.apache.log4j.Logger;
+
+public abstract class BlockCyclicMap<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Mapper<IntWritable, BlockWritable, K, V> {
+ static final Logger LOG = Logger.getLogger(BlockCyclicMap.class);
+ public static Matrix MATRIX_B;
+
+ public static void initJob(String matrixA,
+ Class<? extends BlockCyclicMap> mapper, JobConf job) {
+
+ job.setInputFormat(BlockInputFormat.class);
+ job.setMapperClass(mapper);
+ FileInputFormat.addInputPaths(job, matrixA);
+
+ job.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+ }
+
+ public abstract void map(IntWritable key, BlockWritable value,
+ OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,47 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+
+public abstract class BlockCyclicReduce<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Reducer<K, V, IntWritable, VectorUpdate> {
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<? extends BlockCyclicReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ /**
+ *
+ * @param key
+ * @param values
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public abstract void reduce(K key, Iterator<V> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,55 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class BlockInputFormat extends BlockInputFormatBase implements
+ JobConfigurable {
+ private static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColums(m_cols);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ if (tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+ }
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,250 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+public abstract class BlockInputFormatBase implements
+ InputFormat<IntWritable, BlockWritable> {
+ static final Logger LOG = Logger.getLogger(BlockInputFormatBase.class);
+ private byte[][] inputColumns;
+ private HTable table;
+ private TableRecordReader tableRecordReader;
+ private RowFilterInterface rowFilter;
+
+ /**
+ * Iterate over an HBase table data, return (Text, VectorWritable) pairs
+ */
+ protected static class TableRecordReader implements
+ RecordReader<IntWritable, BlockWritable> {
+ private byte[] startRow;
+ private byte[] endRow;
+ private RowFilterInterface trrRowFilter;
+ private Scanner scanner;
+ private HTable htable;
+ private byte[][] trrInputColumns;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+ rowFiltersSet.add(new StopRowFilter(endRow));
+ rowFiltersSet.add(trrRowFilter);
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+ rowFiltersSet));
+ } else {
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ endRow);
+ }
+ } else {
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ trrRowFilter);
+ }
+ }
+
+ /**
+ * @param htable the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link BlockWritable}.
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ byte[][] columns = inputColumns;
+ this.trrInputColumns = columns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte[] startRow) {
+ byte[] sRow = startRow;
+ this.startRow = sRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte[] endRow) {
+ byte[] eRow = endRow;
+ this.endRow = eRow;
+ }
+
+ /**
+ * @param rowFilter the {@link RowFilterInterface} to be used.
+ */
+ public void setRowFilter(RowFilterInterface rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ this.scanner.close();
+ }
+
+ /**
+ * @return IntWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public IntWritable createKey() {
+ return new IntWritable();
+ }
+
+ /**
+ * @return BlockWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public BlockWritable createValue() {
+ return new BlockWritable();
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key IntWritable as input key.
+ * @param value BlockWritable as input value
+ *
+ * Converts Scanner.next() to IntWritable, BlockWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(IntWritable key, BlockWritable value)
+ throws IOException {
+ RowResult result = this.scanner.next();
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ key.set(BytesUtil.bytesToInt(result.getRow()));
+ Writables.copyWritable(result, value);
+ }
+ return hasMore;
+ }
+ }
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+ * default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<IntWritable, BlockWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ numSplits = 2;
+ Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+ if (BytesUtil.bytesToInt(meta.getValue()) < numSplits) {
+ numSplits = BytesUtil.bytesToInt(meta.getValue());
+ }
+
+ int[] startKeys = new int[numSplits];
+ int interval = BytesUtil.bytesToInt(meta.getValue()) / numSplits;
+
+ for (int i = 0; i < numSplits; i++) {
+ startKeys[i] = (i * interval);
+ }
+
+ InputSplit[] splits = new InputSplit[startKeys.length];
+ for (int i = 0; i < startKeys.length; i++) {
+ splits[i] = new TableSplit(this.table.getTableName(), BytesUtil
+ .intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ? BytesUtil
+ .intToBytes(startKeys[i + 1]) : HConstants.EMPTY_START_ROW);
+ }
+ return splits;
+ }
+
+ /**
+ * @param inputColumns to be passed in {@link BlockWritable} to the map task.
+ */
+ protected void setInputColums(byte[][] inputColumns) {
+ this.inputColumns = inputColumns;
+ }
+
+ /**
+ * Allows subclasses to set the {@link HTable}.
+ *
+ * @param table to get the data from
+ */
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader to provide other {@link TableRecordReader}
+ * implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ /**
+ * Allows subclasses to set the {@link RowFilterInterface} to be used.
+ *
+ * @param rowFilter
+ */
+ protected void setRowFilter(RowFilterInterface rowFilter) {
+ this.rowFilter = rowFilter;
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Sun Nov 16 17:57:47 2008
@@ -33,7 +33,7 @@
public class VectorInputFormat extends VectorInputFormatBase implements
JobConfigurable {
- private final Log LOG = LogFactory.getLog(VectorInputFormat.class);
+ private static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
/**
* space delimited list of columns
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java Sun Nov 16 17:57:47 2008
@@ -99,7 +99,7 @@
return Bytes.toBytes(Constants.COLUMN + String.valueOf(integer));
}
- public static byte[] subMatrixToBytes(Object obj) throws IOException {
+ public static byte[] subMatrixToBytes(SubMatrix obj) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
@@ -110,13 +110,27 @@
return data;
}
- public static SubMatrix bytesToSubMatrix(byte[] value) throws IOException,
- ClassNotFoundException {
+ public static SubMatrix bytesToSubMatrix(byte[] value) throws IOException {
ByteArrayInputStream bos = new ByteArrayInputStream(value);
ObjectInputStream oos = new ObjectInputStream(bos);
- Object obj = oos.readObject();
+ Object obj = null;
+ try {
+ obj = oos.readObject();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
oos.close();
bos.close();
return (SubMatrix) obj;
}
+
+ public static byte[] getBlockIndex(int integer) {
+ return Bytes.toBytes(Constants.BLOCK + String.valueOf(integer));
+ }
+
+ public static int getBlockIndex(byte[] key) {
+ String cKey = new String(key);
+ return Integer.parseInt(cKey
+ .substring(cKey.indexOf(":") + 1, cKey.length()));
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java Sun Nov 16 17:57:47 2008
@@ -23,23 +23,21 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.hama.Matrix;
/**
* A map/reduce job manager
*/
public class JobManager {
-
public static void execute(JobConf jobConf, Matrix result) throws IOException {
- RunningJob rJob = JobClient.runJob(jobConf);
- // TODO : When HADOOP-4043 done, we should change this.
- long rows = rJob.getCounters().findCounter(
- "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
- .getCounter();
+ JobClient.runJob(jobConf);
+ //long rows = rJob.getCounters().findCounter(
+ // "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
+ // .getCounter();
// TODO : Thinking about more efficient method.
+ int rows = result.getColumn(0).size();
int columns = result.getRow(0).size();
- result.setDimension((int) rows, columns);
+ result.setDimension(rows, columns);
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Sun Nov 16 17:57:47 2008
@@ -44,13 +44,13 @@
private static HamaConfiguration conf;
private static HBaseAdmin admin;
private static HamaAdmin hamaAdmin;
-
+
public static Test suite() {
TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
protected void setUp() throws Exception {
HCluster hCluster = new HCluster();
hCluster.setUp();
-
+
conf = hCluster.getConf();
admin = new HBaseAdmin(conf);
hamaAdmin = new HamaAdminImpl(conf, admin);
@@ -75,6 +75,27 @@
m2.close();
}
+ public void testBlocking() throws IOException, ClassNotFoundException {
+ assertEquals(((DenseMatrix) m1).isBlocked(), false);
+ ((DenseMatrix) m1).blocking(2);
+ assertEquals(((DenseMatrix) m1).isBlocked(), true);
+ int[] pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
+ double[][] a = ((DenseMatrix) m1).blockMatrix(1, 0).getDoubles();
+ LOG.info(pos[0]+", "+pos[1]+", "+pos[2]+", "+pos[3]);
+ double[][] b = ((DenseMatrix) m1).subMatrix(pos[0], pos[1], pos[2], pos[3]).getDoubles();
+ double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubles();
+ assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
+ assertEquals(c.length, 5);
+
+ for (int i = 0; i < a.length; i++) {
+ for (int j = 0; j < a.length; j++) {
+ assertEquals(a[i][j], b[i][j]);
+ assertEquals(a[i][j], c[i][j]);
+ assertEquals(b[i][j], c[i][j]);
+ }
+ }
+ }
+
/**
* Column vector test.
*
@@ -90,7 +111,7 @@
x++;
}
}
-
+
public void testGetSetAttribute() throws IOException {
m1.setRowAttribute(0, "row1");
assertEquals(m1.getRowAttribute(0), "row1");
@@ -108,10 +129,10 @@
assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
}
}
-
+
SubMatrix b = m2.subMatrix(0, 2, 0, 2);
SubMatrix c = a.mult(b);
-
+
double[][] C = new double[3][3];
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
@@ -120,7 +141,7 @@
}
}
}
-
+
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
assertEquals(C[i][j], c.get(i, j));
@@ -181,7 +202,7 @@
public void testLoadSave() throws IOException {
String path1 = m1.getPath();
- // save m1 to aliase1
+ // save m1 to aliase1
m1.save(aliase1);
// load matrix m1 using aliase1
DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
@@ -191,11 +212,11 @@
assertEquals(m1.get(i, j), loadTest.get(i, j));
}
}
-
+
assertEquals(path1, loadTest.getPath());
// close loadTest, it just disconnect to the table but didn't delete it.
loadTest.close();
-
+
// try to close m1 & load matrix m1 using aliase1 again.
m1.close();
DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
@@ -209,23 +230,23 @@
// it will do the gc!
loadTest2.close();
assertEquals(false, admin.tableExists(path1));
-
+
// if we try to load non-existed matrix using aliase name, it should fail.
DenseMatrix loadTest3 = null;
try {
loadTest3 = new DenseMatrix(conf, aliase1, false);
fail("Try to load a non-existed matrix should fail!");
} catch (IOException e) {
-
+
} finally {
- if(loadTest3!=null)
+ if (loadTest3 != null)
loadTest3.close();
}
}
-
+
public void testForceCreate() throws IOException {
String path2 = m2.getPath();
- // save m2 to aliase2
+ // save m2 to aliase2
m2.save(aliase2);
// load matrix m2 using aliase2
DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
@@ -235,16 +256,16 @@
assertEquals(m2.get(i, j), loadTest.get(i, j));
}
}
-
+
assertEquals(path2, loadTest.getPath());
-
+
// force to create matrix loadTest2 using aliasename 'aliase2'
DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
String loadPath2 = loadTest2.getPath();
assertFalse(path2.equals(loadPath2));
assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
-
+
// try to close m2 & loadTest, it table will be deleted finally
m2.close();
assertEquals(true, admin.tableExists(path2));
@@ -280,8 +301,8 @@
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
- .valueOf(C[i][j]).substring(0, 14));
+ assertEquals(String.valueOf(result.get(i, j)).substring(0, 14),
+ String.valueOf(C[i][j]).substring(0, 14));
}
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java Sun Nov 16 17:57:47 2008
@@ -19,6 +19,7 @@
*/
package org.apache.hama;
+import java.io.IOException;
import java.util.Iterator;
import junit.extensions.TestSetup;
@@ -113,9 +114,11 @@
/**
* Test get/set methods
+ * @throws IOException
*/
- public void testGetSet() {
+ public void testGetSet() throws IOException {
assertEquals(v1.get(0), values[0][0]);
+ assertEquals(m1.getColumn(0).size(), 2);
}
/**
Added: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=718158&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Sun Nov 16 17:57:47 2008
@@ -0,0 +1,66 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HCluster;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.io.BlockWritable;
+import org.apache.log4j.Logger;
+
+public class TestBlockMatrixMapReduce extends HCluster {
+ static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+ static Matrix c;
+ static final int SIZE = 20;
+ /** constructor */
+ public TestBlockMatrixMapReduce() {
+ super();
+ }
+
+ public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
+ Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+ Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+ ((DenseMatrix) m1).blocking(2);
+ ((DenseMatrix) m2).blocking(2);
+
+ miniMRJob(m1.getPath(), m2.getPath());
+
+ double[][] C = new double[SIZE][SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ C[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ assertEquals(String.valueOf(C[i][j]).substring(0, 5),
+ String.valueOf(c.get(i, j)).substring(0, 5));
+ }
+ }
+ }
+
+ private void miniMRJob(String string, String string2) throws IOException {
+ c = new DenseMatrix(conf);
+ String output = c.getPath();
+
+ JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
+ jobConf.setJobName("test MR job");
+
+ BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
+ BlockWritable.class, jobConf);
+ BlockCyclicReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
+
+ jobConf.setNumMapTasks(2);
+ jobConf.setNumReduceTasks(2);
+
+ JobClient.runJob(jobConf);
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=718158&r1=718157&r2=718158&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Sun Nov 16 17:57:47 2008
@@ -37,9 +37,6 @@
*/
public class TestMatrixMapReduce extends HCluster {
static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
- String pathA;
- String pathB;
- String output;
/** constructor */
public TestMatrixMapReduce() {
@@ -48,28 +45,26 @@
public void testMatrixMapReduce() throws IOException {
Matrix matrixA = new DenseMatrix(conf);
- pathA = matrixA.getPath();
matrixA.set(0, 0, 1);
matrixA.set(0, 1, 0);
matrixA.setDimension(1, 2);
Matrix matrixB = new DenseMatrix(conf);
- pathB = matrixB.getPath();
matrixB.set(0, 0, 1);
matrixB.set(0, 1, 1);
matrixB.setDimension(1, 2);
- miniMRJob();
+ miniMRJob(matrixA.getPath(), matrixB.getPath());
}
- private void miniMRJob() throws IOException {
+ private void miniMRJob(String string, String string2) throws IOException {
Matrix c = new DenseMatrix(conf);
- output = c.getPath();
+ String output = c.getPath();
JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
jobConf.setJobName("test MR job");
- RowCyclicAdditionMap.initJob(pathA, pathB, RowCyclicAdditionMap.class, IntWritable.class,
+ RowCyclicAdditionMap.initJob(string, string2, RowCyclicAdditionMap.class, IntWritable.class,
VectorWritable.class, jobConf);
RowCyclicReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);