You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2009/12/02 08:58:49 UTC
[Hama Wiki] Trivial Update of "MatMult" by udanax
Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.
The "MatMult" page has been changed by udanax.
http://wiki.apache.org/hama/MatMult?action=diff&rev1=13&rev2=14
--------------------------------------------------
}
}}}
+ == Multiplication of file matrices ==
+
+ {{{
+ public static void main(String[] args) throws IOException {
+ collectBlocksFromFile(path[0], true, collectionTable, conf); // path of matrix A
+ collectBlocksFromFile(path[1], false, collectionTable, conf); // path of matrix B
+
+ Matrix result = new DenseMatrix(conf, 4, 4);
+ Job job = new Job(conf, "multiplication MR job : " + result.getPath());
+
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+
+ TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
+ BlockMultMap.class, BlockID.class, BytesWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(result.getPath(),
+ BlockMultReduce.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void collectBlocksFromFile(Path path, boolean b,
+ String collectionTable, HamaConfiguration conf) throws IOException {
+ Job job = new Job(conf, "Blocking MR job" + path);
+ job.setMapperClass(MyMapper.class);
+ job.setMapOutputKeyClass(BlockID.class);
+ job.setMapOutputValueClass(MapWritable.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ SequenceFileInputFormat.addInputPath(job, path);
+
+ job.getConfiguration().set(MyMapper.BLOCK_SIZE, String.valueOf(2));
+ job.getConfiguration().set(MyMapper.ROWS, String.valueOf(4));
+ job.getConfiguration().set(MyMapper.COLUMNS, String.valueOf(4));
+ job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b);
+
+ TableMapReduceUtil.initTableReducerJob(collectionTable,
+ org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static class MyMapper extends
+ Mapper<IntWritable, MapWritable, BlockID, MapWritable> implements
+ Configurable {
+ private Configuration conf = null;
+ /** Parameter of the path of the matrix to be blocked * */
+ public static final String BLOCK_SIZE = "hama.blocking.size";
+ public static final String ROWS = "hama.blocking.rows";
+ public static final String COLUMNS = "hama.blocking.columns";
+ public static final String MATRIX_POS = "a.or.b";
+
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+
+ public void map(IntWritable key, MapWritable value, Context context)
+ throws IOException, InterruptedException {
+ int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
+ DenseVector dv = new DenseVector(key.get(), value);
+
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if (endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ context.write(new BlockID(blkRow, i), dv.subVector(startColumn,
+ endColumn).getEntries());
+
+ i++;
+ } while (endColumn < (mColumns - 1));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = conf.getInt(BLOCK_SIZE, 2);
+ mRows = conf.getInt(ROWS, 4);
+ mColumns = conf.getInt(COLUMNS, 4);
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+ }
+ }
+ }}}
+