You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/15 04:11:27 UTC
svn commit: r734602 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/
src/java/org/apache/hama/mapred/ src/java/org/apache/hama/util/ src/test/...
Author: edwardyoon
Date: Wed Jan 14 19:11:26 2009
New Revision: 734602
URL: http://svn.apache.org/viewvc?rev=734602&view=rev
Log:
Improving speed of matrix multiplication
Added:
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan 14 19:11:26 2009
@@ -33,6 +33,7 @@
IMPROVEMENTS
+ HAMA-129: Improving speed of matrix multiplication (edwardyoon)
HAMA-142: Trunk doesn't work for large matrices (edwardyoon)
HAMA-143: Improve of random_mapred() (edwardyoon)
HAMA-138: To order, left pad with zeroes to integer key (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java Wed Jan 14 19:11:26 2009
@@ -26,7 +26,7 @@
public abstract class AbstractExample {
public static final HamaConfiguration conf = new HamaConfiguration();
- protected static List<String> ARGS;
+ public static List<String> ARGS;
public static void parseArgs(String[] args) {
List<String> other_args = new ArrayList<String>();
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Wed Jan 14 19:11:26 2009
@@ -39,17 +39,16 @@
DenseMatrix a = new DenseMatrix(conf, matrixA, false);
DenseMatrix b = new DenseMatrix(conf, matrixB, false);
-
+ Matrix c;
+
if (ARGS.size() > 2) {
- a.blocking_mapred(Integer.parseInt(ARGS.get(2)));
- b.blocking_mapred(Integer.parseInt(ARGS.get(2)));
+ c = a.mult(b, Integer.parseInt(ARGS.get(2)));
+ } else {
+ c = a.mult(b);
}
- Matrix c = a.mult(b);
for (int i = 0; i < 2; i++) {
- for (int j = 0; j < 2; j++) {
- System.out.println("c(" + i + ", " + j + ") : " + c.get(i, j));
- }
+ System.out.println("c(" + 0 + ", " + i + ") : " + c.get(0, i));
}
System.out.println("...");
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Wed Jan 14 19:11:26 2009
@@ -73,13 +73,16 @@
protected void create() throws IOException {
// It should run only when table doesn't exist.
if (!admin.tableExists(matrixPath)) {
- this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
+ this.tableDesc.addFamily(new HColumnDescriptor(
+ Bytes.toBytes(Constants.COLUMN), 3, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
- this.tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
- Integer.MAX_VALUE, HConstants.FOREVER, false));
-
+ // It's a temporary data.
+ this.tableDesc.addFamily(new HColumnDescriptor(
+ Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+
LOG.info("Initializing the matrix storage.");
this.admin.createTable(this.tableDesc);
LOG.info("Create Matrix " + matrixPath);
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Jan 14 19:11:26 2009
@@ -28,10 +28,8 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -39,8 +37,8 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.algebra.BlockMultiplyMap;
+import org.apache.hama.algebra.BlockMultiplyReduce;
import org.apache.hama.algebra.RowCyclicAdditionMap;
import org.apache.hama.algebra.RowCyclicAdditionReduce;
import org.apache.hama.algebra.SIMDMultiplyMap;
@@ -383,30 +381,53 @@
}
public Matrix mult(Matrix B) throws IOException {
- Matrix result = new DenseMatrix(config);
-
+ Matrix result = new DenseMatrix(config);
+
JobConf jobConf = new JobConf(config);
jobConf.setJobName("multiplication MR job : " + result.getPath());
jobConf.setNumMapTasks(config.getNumMapTasks());
jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
+ SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
+ jobConf);
+ SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
+ jobConf);
+ JobManager.execute(jobConf, result);
+ return result;
+ }
- if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
- BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(),
- ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(),
- BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
- jobConf);
- BlockCyclicMultiplyReduce.initJob(result.getPath(),
- BlockCyclicMultiplyReduce.class, jobConf);
- } else {
- SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
- SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
- jobConf);
- SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
- jobConf);
- }
+ /**
+ * A * B
+ *
+ * @param B
+ * @param blocks the number of blocks
+ * @return C
+ * @throws IOException
+ */
+ public Matrix mult(Matrix B, int blocks) throws IOException {
+ Matrix collectionTable = new DenseMatrix(config);
+ LOG.info("Collect Blocks");
+ collectBlocks(this, collectionTable, blocks, true);
+ collectBlocks(B, collectionTable, blocks, false);
+
+ Matrix result = new DenseMatrix(config);
+
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ BlockMultiplyMap.initJob(collectionTable.getPath(),
+ BlockMultiplyMap.class, BlockID.class, BlockWritable.class,
+ jobConf);
+ BlockMultiplyReduce.initJob(result.getPath(),
+ BlockMultiplyReduce.class, jobConf);
JobManager.execute(jobConf, result);
+ // Should be collectionTable removed?
return result;
}
@@ -448,6 +469,9 @@
return this.getClass().getSimpleName();
}
+ /**
+ * Gets the sub matrix
+ */
public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
int columnSize = (j1 - j0) + 1;
SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
@@ -474,37 +498,23 @@
return result;
}
- public boolean isBlocked() throws IOException {
- return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false
- : true;
- }
-
- public SubMatrix getBlock(int i, int j) throws IOException {
- return new SubMatrix(table.get(new BlockID(i, j).getBytes(),
- Bytes.toBytes(Constants.BLOCK)).getValue());
- }
-
- public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
- BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
- update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
- table.commit(update);
- }
-
/**
- * Using a map/reduce job to block a dense matrix.
+ * Collect Blocks
*
+ * @param resource
+ * @param collectionTable
* @param blockNum
+ * @param bool
* @throws IOException
*/
- public void blocking_mapred(int blockNum) throws IOException {
+ public void collectBlocks(Matrix resource, Matrix collectionTable,
+ int blockNum, boolean bool) throws IOException {
double blocks = Math.pow(blockNum, 0.5);
if (!String.valueOf(blocks).endsWith(".0"))
throw new IOException("can't divide.");
int block_size = (int) blocks;
- Matrix blockedMatrix = new DenseMatrix(config);
- blockedMatrix.setDimension(block_size, block_size);
- this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size);
+ collectionTable.setDimension(block_size, block_size);
JobConf jobConf = new JobConf(config);
jobConf.setJobName("Blocking MR job" + getPath());
@@ -512,25 +522,8 @@
jobConf.setNumMapTasks(config.getNumMapTasks());
jobConf.setNumReduceTasks(config.getNumReduceTasks());
- BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(),
- block_size, this.getRows(), this.getColumns(), jobConf);
+ BlockingMapRed.initJob(resource.getPath(), collectionTable.getPath(),
+ bool, block_size, this.getRows(), this.getColumns(), jobConf);
JobManager.execute(jobConf);
}
-
- public String getBlockedMatrixPath() throws IOException {
- return Bytes.toString(table.get(Constants.METADATA,
- Constants.BLOCK_PATH).getValue());
- }
-
- protected void setBlockedMatrixPath(String path, int size) throws IOException {
- BatchUpdate update = new BatchUpdate(Constants.METADATA);
- update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
- update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size));
- table.commit(update);
- }
-
- public int getBlockedMatrixSize() throws IOException {
- return Bytes.toInt(table.get(Constants.METADATA,
- Constants.BLOCK_SIZE).getValue());
- }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Wed Jan 14 19:11:26 2009
@@ -209,5 +209,15 @@
return data;
}
+ public String toString() {
+ String result = "";
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < this.getColumns(); j++) {
+ result += this.get(i, j) + "\t";
+ }
+ result += "\n";
+ }
+ return result;
+ }
}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockInputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyMap extends MapReduceBase implements
+ Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
+ static final Logger LOG = Logger.getLogger(BlockMultiplyMap.class);
+
+ public static void initJob(String matrix_a,
+ Class<BlockMultiplyMap> map, Class<BlockID> outputKeyClass,
+ Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+
+ jobConf.setInputFormat(BlockInputFormat.class);
+ FileInputFormat.addInputPaths(jobConf, matrix_a);
+
+ jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+ }
+
+ @Override
+ public void map(BlockID key, BlockWritable value,
+ OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
+ throws IOException {
+
+ SubMatrix c = value.get(0).mult(value.get(1));
+ output.collect(key, new BlockWritable(c));
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyReduce extends MapReduceBase implements
+ Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
+ static final Logger LOG = Logger.getLogger(BlockMultiplyReduce.class);
+
+ /**
+ * Use this before submitting a BlockCyclicMultiplyReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<BlockMultiplyReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(BlockID key, Iterator<BlockWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+
+ SubMatrix s = null;
+ while (values.hasNext()) {
+ SubMatrix b = values.next().getMatrices().next();
+ if (s == null) {
+ s = b;
+ } else {
+ s = s.add(b);
+ }
+ }
+
+ int startRow = key.getRow() * s.getRows();
+ int startColumn = key.getColumn() * s.getColumns();
+
+ for (int i = 0; i < s.getRows(); i++) {
+ VectorUpdate update = new VectorUpdate(i + startRow);
+ for (int j = 0; j < s.getColumns(); j++) {
+ update.put(j + startColumn, s.get(i, j));
+ }
+ output.collect(new IntWritable(key.getRow()), update);
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Wed Jan 14 19:11:26 2009
@@ -34,7 +34,8 @@
public static final int PAD_SIZE = 15;
private int row;
private int column;
-
+ private int seq = -1;
+
public BlockID() {
}
@@ -61,12 +62,18 @@
try {
this.row = Integer.parseInt(keys[1]);
- this.column = Integer.parseInt(keys[2]);
+ String[] columns = keys[2].split("[-]");
+ this.column = Integer.parseInt(columns[0]);
} catch (ArrayIndexOutOfBoundsException e) {
throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e);
}
}
+ public BlockID(int row, int column, int seq) {
+ set(row, column);
+ this.seq = seq;
+ }
+
public void set(int row, int column) {
this.row = row;
this.column = column;
@@ -101,7 +108,11 @@
buf.append("0");
}
- return buf.toString() + "," + row + "," + column;
+ if(seq > -1) {
+ return buf.toString() + "," + row + "," + column + "-" + seq;
+ } else {
+ return buf.toString() + "," + row + "," + column;
+ }
}
@Override
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Wed Jan 14 19:11:26 2009
@@ -19,59 +19,90 @@
*/
package org.apache.hama.io;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.SubMatrix;
public class BlockWritable implements Writable {
- private SubMatrix matrix;
+ static final Log LOG = LogFactory.getLog(BlockWritable.class);
+ private List<SubMatrix> matrices;
public BlockWritable() {
- this.matrix = new SubMatrix(0, 0);
- }
-
- public BlockWritable(SubMatrix c) {
- this.matrix = c;
+ this.matrices = new ArrayList<SubMatrix>();
}
- public BlockWritable(byte[] bytes) throws IOException {
- this.matrix = new SubMatrix(bytes);
+ public BlockWritable(SubMatrix subMatrix) {
+ this.matrices = new ArrayList<SubMatrix>();
+ this.matrices.add(subMatrix);
}
public void readFields(DataInput in) throws IOException {
-
- int rows = in.readInt();
- int columns = in.readInt();
- this.matrix = new SubMatrix(rows, columns);
-
- for(int i = 0; i < rows; i++) {
- for(int j = 0; j < columns; j++) {
- this.matrix.set(i, j, in.readDouble());
+ this.matrices.clear();
+ int size = in.readInt();
+
+ for (int x = 0; x < size; x++) {
+ int rows = in.readInt();
+ int columns = in.readInt();
+
+ SubMatrix matrix = new SubMatrix(rows, columns);
+
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < columns; j++) {
+ matrix.set(i, j, in.readDouble());
+ }
}
+
+ this.matrices.add(matrix);
}
-
- //this.matrix = new SubMatrix(Bytes.readByteArray(in));
}
public void write(DataOutput out) throws IOException {
- //Bytes.writeByteArray(out, this.matrix.getBytes());
-
- out.writeInt(this.matrix.getRows());
- out.writeInt(this.matrix.getColumns());
-
- for(int i = 0; i < this.matrix.getRows(); i++) {
- for(int j = 0; j < this.matrix.getColumns(); j++) {
- out.writeDouble(this.matrix.get(i, j));
+ Iterator<SubMatrix> it = this.matrices.iterator();
+
+ int size = this.matrices.size();
+ out.writeInt(size);
+
+ while (it.hasNext()) {
+ SubMatrix matrix = it.next();
+
+ out.writeInt(matrix.getRows());
+ out.writeInt(matrix.getColumns());
+
+ for (int i = 0; i < matrix.getRows(); i++) {
+ for (int j = 0; j < matrix.getColumns(); j++) {
+ out.writeDouble(matrix.get(i, j));
+ }
}
}
}
- public SubMatrix get() {
- return this.matrix;
+ public void set(byte[] key, byte[] value) throws IOException {
+ int index = 0;
+ if (new String(key).equals(Constants.BLOCK + "b")) {
+ index = 1;
+ }
+
+ this.matrices.add(index, new SubMatrix(value));
}
-}
+ public Iterator<SubMatrix> getMatrices() {
+ return this.matrices.iterator();
+ }
+
+ public SubMatrix get(int index) {
+ return this.matrices.get(index);
+ }
+
+ public int size() {
+ return this.matrices.size();
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Wed Jan 14 19:11:26 2009
@@ -20,10 +20,12 @@
package org.apache.hama.mapred;
import java.io.IOException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.util.Writables;
@@ -34,7 +36,6 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.Constants;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
@@ -43,20 +44,11 @@
static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
private TableRecordReader tableRecordReader;
- public static int getRepeatCount() {
- return TableRecordReader.repeatCount;
- }
-
- public static int getRepeat() {
- return repeat;
- }
-
/**
* Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
*/
protected static class TableRecordReader extends TableRecordReaderBase
implements RecordReader<BlockID, BlockWritable> {
- private static int repeatCount = 0;
/**
* @return IntWritable
@@ -99,21 +91,17 @@
boolean hasMore = result != null && result.size() > 0;
- // Scanner will be restarted.
- if(!hasMore && repeatCount < repeat - 1) {
- this.init();
- repeatCount++;
- result = this.scanner.next();
- hasMore = result != null && result.size() > 0;
- }
-
if (hasMore) {
byte[] row = result.getRow();
BlockID bID = new BlockID(row);
lastRow = row;
key.set(bID.getRow(), bID.getColumn());
- byte[] rs = result.get(Constants.BLOCK).getValue();
- Writables.copyWritable(new BlockWritable(rs), value);
+
+ BlockWritable block = new BlockWritable();
+ for(Map.Entry<byte[], Cell> e : result.entrySet()) {
+ block.set(e.getKey(), e.getValue().getValue());
+ }
+ Writables.copyWritable(block, value);
}
return hasMore;
}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hama.Constants;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+
+public class BlockOutputFormat extends
+ FileOutputFormat<BlockID, BlockWritable> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hama.mapred.output";
+ public static final String COLUMN = "hama.mapred.output.column";
+ private final static Log LOG = LogFactory.getLog(VectorOutputFormat.class);
+
+ /**
+ * Convert Reduce output (key, value) to (IntWritable, VectorUpdate)
+ * and write to an HBase table
+ */
+ protected static class TableRecordWriter implements
+ RecordWriter<BlockID, BlockWritable> {
+ private HTable m_table;
+ private BatchUpdate update;
+ private String column;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param table
+ * @param col
+ */
+ public TableRecordWriter(HTable table, String col) {
+ m_table = table;
+ column = col;
+ }
+
+ public void close(@SuppressWarnings("unused")
+ Reporter reporter) throws IOException {
+ m_table.flushCommits();
+ }
+
+ /** {@inheritDoc} */
+ public void write(BlockID key, BlockWritable value) throws IOException {
+ Iterator<SubMatrix> it = value.getMatrices();
+ update = new BatchUpdate(key.getBytes());
+ update.put(Bytes.toBytes(Constants.BLOCK + column), it.next().getBytes());
+ m_table.commit(new BatchUpdate(update));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+ String column = job.get(COLUMN);
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ return new TableRecordWriter(table, column);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if (tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Wed Jan 14 19:11:26 2009
@@ -29,13 +29,11 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
import org.apache.hama.DenseVector;
-import org.apache.hama.HamaConfiguration;
import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.VectorWritable;
/**
@@ -45,74 +43,71 @@
static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
/** Parameter of the path of the matrix to be blocked * */
- public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
- public static final String BLOCKED_MATRIX = "hama.blocked.matrix";
public static final String BLOCK_SIZE = "hama.blocking.size";
public static final String ROWS = "hama.blocking.rows";
public static final String COLUMNS = "hama.blocking.columns";
+ public static final String MATRIX_POS = "a.ore.b";
/**
* Initialize a job to blocking a table
*
* @param matrixPath
- * @param string
+ * @param collectionTable
+ * @param block_size
* @param j
* @param i
- * @param block_size
* @param job
*/
- public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) {
+ public static void initJob(String matrixPath, String collectionTable, boolean bool
+ ,int block_size, int i, int j, JobConf job) {
job.setMapperClass(BlockingMapper.class);
job.setReducerClass(BlockingReducer.class);
FileInputFormat.addInputPaths(job, matrixPath);
job.setInputFormat(VectorInputFormat.class);
+
job.setMapOutputKeyClass(BlockID.class);
job.setMapOutputValueClass(VectorWritable.class);
- job.setOutputFormat(NullOutputFormat.class);
-
- job.set(BLOCKING_MATRIX, matrixPath);
- job.set(BLOCKED_MATRIX, string);
+
+ job.setOutputFormat(BlockOutputFormat.class);
+ job.setOutputKeyClass(BlockID.class);
+ job.setOutputValueClass(BlockWritable.class);
job.set(BLOCK_SIZE, String.valueOf(block_size));
job.set(ROWS, String.valueOf(i));
job.set(COLUMNS, String.valueOf(j));
-
+ job.setBoolean(MATRIX_POS, bool);
job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ if(bool)
+ job.set(BlockOutputFormat.COLUMN, "a");
+ else
+ job.set(BlockOutputFormat.COLUMN, "b");
+ job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable);
}
/**
* Abstract Blocking Map/Reduce Class to configure the job.
*/
public static abstract class BlockingMapRedBase extends MapReduceBase {
-
- protected DenseMatrix matrix;
- protected DenseMatrix blockedMatrix;
protected int mBlockNum;
protected int mBlockRowSize;
protected int mBlockColSize;
protected int mRows;
protected int mColumns;
+
+ protected boolean matrixPos;
@Override
public void configure(JobConf job) {
- try {
- matrix = new DenseMatrix(new HamaConfiguration(), job.get(
- BLOCKING_MATRIX, ""));
- blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get(
- BLOCKED_MATRIX, ""));
-
mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
mRows = Integer.parseInt(job.get(ROWS, ""));
mColumns = Integer.parseInt(job.get(COLUMNS, ""));
mBlockRowSize = mRows / mBlockNum;
mBlockColSize = mColumns / mBlockNum;
- } catch (IOException e) {
- LOG.warn("Load matrix_blocking failed : " + e.getMessage());
- }
+
+ matrixPos = job.getBoolean(MATRIX_POS, true);
}
-
}
/**
@@ -142,21 +137,19 @@
i++;
} while(endColumn < (mColumns-1));
}
-
}
/**
* Reducer Class
*/
public static class BlockingReducer extends BlockingMapRedBase implements
- Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
+ Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
@Override
public void reduce(BlockID key, Iterator<VectorWritable> values,
- OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+ OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
- // Note: all the sub-vectors are grouped by {@link
- // org.apache.hama.io.BlockID}
+ // Note: all the sub-vectors are grouped by {@link org.apache.hama.io.BlockID}
// the block's base offset in the original matrix
int colBase = key.getColumn() * mBlockColSize;
@@ -189,9 +182,22 @@
subMatrix.set(i, j, vw.get(colBase + j));
}
}
+ BlockWritable outValue = new BlockWritable(subMatrix);
- blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
+ // It'll used for only matrix multiplication.
+ if(matrixPos) {
+ for (int x = 0; x < mBlockNum; x++) {
+ int r = (key.getRow() * mBlockNum) * mBlockNum;
+ int seq = (x * mBlockNum) + key.getColumn() + r;
+ output.collect(new BlockID(key.getRow(), x, seq), outValue);
+ }
+ } else {
+ for (int x = 0; x < mBlockNum; x++) {
+ int seq = (x * mBlockNum * mBlockNum) +
+ (key.getColumn() * mBlockNum) + key.getRow();
+ output.collect(new BlockID(x, key.getColumn(), seq), outValue);
+ }
+ }
}
}
-
-}
+}
\ No newline at end of file
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Wed Jan 14 19:11:26 2009
@@ -40,19 +40,14 @@
protected byte[][] inputColumns;
protected HTable table;
protected RowFilterInterface rowFilter;
- protected static int repeat;
/**
* space delimited list of columns
*/
public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
- public static final String REPEAT_NUM = "hama.mapred.repeat";
public void configure(JobConf job) {
Path[] tableNames = FileInputFormat.getInputPaths(job);
- if(job.get(REPEAT_NUM) != null) {
- setRepeat(Integer.parseInt(job.get(REPEAT_NUM)));
- }
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
byte[][] m_cols = new byte[colNames.length][];
@@ -67,10 +62,6 @@
}
}
- private void setRepeat(int parseInt) {
- repeat = parseInt;
- }
-
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path[] tableNames = FileInputFormat.getInputPaths(job);
@@ -136,7 +127,6 @@
return splits;
}
-
/**
* @param inputColumns to be passed to the map task.
*/
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java Wed Jan 14 19:11:26 2009
@@ -30,14 +30,16 @@
*/
public class JobManager {
public static void execute(JobConf jobConf, Matrix result) throws IOException {
- JobClient.runJob(jobConf);
- //long rows = rJob.getCounters().findCounter(
- // "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
- // .getCounter();
- // TODO : Thinking about more efficient method.
- int rows = result.getColumn(0).size();
- int columns = result.getRow(0).size();
- result.setDimension(rows, columns);
+ try {
+ JobClient.runJob(jobConf);
+ // TODO : Thinking about more efficient method.
+ int rows = result.getColumn(0).size();
+ int columns = result.getRow(0).size();
+ result.setDimension(rows, columns);
+ } catch (IOException e) {
+ result.close();
+ throw new IOException(e);
+ }
}
/**
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Wed Jan 14 19:11:26 2009
@@ -83,18 +83,6 @@
}
/**
- * Map/Reduce Blocking Test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public void testMRBlocking() throws IOException, ClassNotFoundException {
- assertEquals(((DenseMatrix) m2).isBlocked(), false);
- ((DenseMatrix) m2).blocking_mapred(4);
- assertEquals(((DenseMatrix) m2).isBlocked(), true);
- }
-
- /**
* Column vector test.
*
* @param rand
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Wed Jan 14 19:11:26 2009
@@ -23,7 +23,6 @@
import org.apache.hama.DenseMatrix;
import org.apache.hama.HCluster;
-import org.apache.hama.Matrix;
import org.apache.log4j.Logger;
public class TestBlockMatrixMapReduce extends HCluster {
@@ -37,13 +36,10 @@
public void testBlockMatrixMapReduce() throws IOException,
ClassNotFoundException {
- Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
- Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
- // Partitioning 8 * 8 submatrix. It also the test submatrix() and scanner.
- ((DenseMatrix) m1).blocking_mapred(16);
- ((DenseMatrix) m2).blocking_mapred(16);
+ DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+ DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
- Matrix c = m1.mult(m2);
+ DenseMatrix c = (DenseMatrix) m1.mult(m2, 16);
double[][] mem = new double[SIZE][SIZE];
for (int i = 0; i < SIZE; i++) {