You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/12/02 10:03:53 UTC
svn commit: r886077 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/graph/ src/java/org/apache/hama/mapreduce/
src/test/org/apache/hama/examples/
Author: edwardyoon
Date: Wed Dec 2 09:03:53 2009
New Revision: 886077
URL: http://svn.apache.org/viewvc?rev=886077&view=rev
Log:
Add file multiplication example
Added:
incubator/hama/trunk/src/java/org/apache/hama/graph/Walker.java
incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=886077&r1=886076&r2=886077&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Dec 2 09:03:53 2009
@@ -42,6 +42,7 @@
IMPROVEMENTS
+ HAMA-232: Add file multiplication example (edwardyoon)
HAMA-226: Add benchmarking tool of eigenvalue operation (edwardyoon)
HAMA-223: Add simple admin tool for manage the temporary tables (edwardyoon)
HAMA-202: Replacement for deprecated API of Map/Reduce (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java?rev=886077&r1=886076&r2=886077&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java Wed Dec 2 09:03:53 2009
@@ -214,7 +214,6 @@
return heartbeatResponse;
}
- @Override
public void run() {
try {
startCleanupThreads();
Added: incubator/hama/trunk/src/java/org/apache/hama/graph/Walker.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/Walker.java?rev=886077&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/Walker.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/Walker.java Wed Dec 2 09:03:53 2009
@@ -0,0 +1,5 @@
+package org.apache.hama.graph;
+
+public class Walker {
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java?rev=886077&r1=886076&r2=886077&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java Wed Dec 2 09:03:53 2009
@@ -30,7 +30,6 @@
// the block's base offset in the original matrix
int colBase = key.getColumn() * mBlockColSize;
int rowBase = key.getRow() * mBlockRowSize;
-
// the block's size : rows & columns
int smRows = mBlockRowSize;
Added: incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java?rev=886077&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java Wed Dec 2 09:03:53 2009
@@ -0,0 +1,226 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.Matrix;
+import org.apache.hama.matrix.algebra.BlockMultMap;
+import org.apache.hama.matrix.algebra.BlockMultReduce;
+import org.apache.hama.util.RandomVariable;
+
+public class TestFileMatrixBlockMult extends HamaCluster {
+ final static Log LOG = LogFactory.getLog(TestFileMatrixBlockMult.class
+ .getName());
+ private HamaConfiguration conf;
+ private Path[] path = new Path[2];
+ private String collectionTable;
+
+ /**
+ * @throws UnsupportedEncodingException
+ */
+ public TestFileMatrixBlockMult() throws UnsupportedEncodingException {
+ super();
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ conf = getConf();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ collectionTable = "collect_" + RandomVariable.randMatrixPath();
+ HTableDescriptor desc = new HTableDescriptor(collectionTable);
+ desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+ admin.createTable(desc);
+ }
+
+ public void createFiles() throws IOException {
+ Configuration conf = new Configuration();
+ LocalFileSystem fs = new LocalFileSystem();
+ fs.setConf(conf);
+ fs.getRawFileSystem().setConf(conf);
+
+ for (int i = 0; i < 2; i++) {
+ path[i] = new Path(System.getProperty("test.build.data", ".") + "/tmp"
+ + i + ".seq");
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path[i],
+ IntWritable.class, MapWritable.class, CompressionType.BLOCK);
+
+ MapWritable value = new MapWritable();
+ value.put(new IntWritable(0), new DoubleWritable(0.5));
+ value.put(new IntWritable(1), new DoubleWritable(0.1));
+ value.put(new IntWritable(2), new DoubleWritable(0.5));
+ value.put(new IntWritable(3), new DoubleWritable(0.1));
+
+ writer.append(new IntWritable(0), value);
+ writer.append(new IntWritable(1), value);
+ writer.append(new IntWritable(2), value);
+ writer.append(new IntWritable(3), value);
+
+ writer.close();
+ }
+
+ FileSystem xfs = path[0].getFileSystem(conf);
+ SequenceFile.Reader reader1 = new SequenceFile.Reader(xfs, path[0], conf);
+ // read first value from reader1
+ IntWritable key = new IntWritable();
+ MapWritable val = new MapWritable();
+ reader1.next(key, val);
+
+ assertEquals(key.get(), 0);
+ }
+
+ public void testFileMatrixMult() throws IOException {
+ createFiles();
+ collectBlocksFromFile(path[0], true, collectionTable, conf);
+ collectBlocksFromFile(path[1], false, collectionTable, conf);
+
+ Matrix result = new DenseMatrix(conf, 4, 4);
+ Job job = new Job(conf, "multiplication MR job : " + result.getPath());
+
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+
+ TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
+ BlockMultMap.class, BlockID.class, BytesWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(result.getPath(),
+ BlockMultReduce.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ verifyMultResult(result);
+ }
+
+ private void verifyMultResult(Matrix result) throws IOException {
+ double[][] a = new double[][] { { 0.5, 0.1, 0.5, 0.1 },
+ { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } };
+ double[][] b = new double[][] { { 0.5, 0.1, 0.5, 0.1 },
+ { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 }, { 0.5, 0.1, 0.5, 0.1 } };
+ double[][] c = new double[4][4];
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 4; j++) {
+ for (int k = 0; k < 4; k++) {
+ c[i][k] += a[i][j] * b[j][k];
+ }
+ }
+ }
+
+ for (int i = 0; i < result.getRows(); i++) {
+ for (int j = 0; j < result.getColumns(); j++) {
+ double gap = (c[i][j] - result.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
+ }
+ }
+ }
+
+ private static void collectBlocksFromFile(Path path, boolean b,
+ String collectionTable, HamaConfiguration conf) throws IOException {
+ Job job = new Job(conf, "Blocking MR job" + path);
+ job.setMapperClass(MyMapper.class);
+ job.setMapOutputKeyClass(BlockID.class);
+ job.setMapOutputValueClass(MapWritable.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ SequenceFileInputFormat.addInputPath(job, path);
+
+ job.getConfiguration().set(MyMapper.BLOCK_SIZE, String.valueOf(2));
+ job.getConfiguration().set(MyMapper.ROWS, String.valueOf(4));
+ job.getConfiguration().set(MyMapper.COLUMNS, String.valueOf(4));
+ job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b);
+
+ TableMapReduceUtil.initTableReducerJob(collectionTable,
+ org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static class MyMapper extends
+ Mapper<IntWritable, MapWritable, BlockID, MapWritable> implements
+ Configurable {
+ private Configuration conf = null;
+ /** Parameter of the path of the matrix to be blocked * */
+ public static final String BLOCK_SIZE = "hama.blocking.size";
+ public static final String ROWS = "hama.blocking.rows";
+ public static final String COLUMNS = "hama.blocking.columns";
+ public static final String MATRIX_POS = "a.or.b";
+
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+
+ public void map(IntWritable key, MapWritable value, Context context)
+ throws IOException, InterruptedException {
+ int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
+ DenseVector dv = new DenseVector(key.get(), value);
+
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if (endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ context.write(new BlockID(blkRow, i), dv.subVector(startColumn,
+ endColumn).getEntries());
+
+ i++;
+ } while (endColumn < (mColumns - 1));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = conf.getInt(BLOCK_SIZE, 2);
+ mRows = conf.getInt(ROWS, 4);
+ mColumns = conf.getInt(COLUMNS, 4);
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+ }
+ }
+}