You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/30 12:52:48 UTC
svn commit: r730103 - in /incubator/hama/trunk/src: java/org/apache/hama/
java/org/apache/hama/algebra/ java/org/apache/hama/io/
java/org/apache/hama/mapred/ test/org/apache/hama/
test/org/apache/hama/mapred/
Author: edwardyoon
Date: Tue Dec 30 03:52:47 2008
New Revision: 730103
URL: http://svn.apache.org/viewvc?rev=730103&view=rev
Log:
Fix OOME
Modified:
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,228 +1,233 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Methods of the matrix classes
- */
-public abstract class AbstractMatrix implements Matrix {
- static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
-
- protected HamaConfiguration config;
- protected HBaseAdmin admin;
- // a matrix just need a table path to point to the table which stores matrix.
- // let HamaAdmin manage Matrix Name space.
- protected String matrixPath;
- protected HTable table;
- protected HTableDescriptor tableDesc;
- protected HamaAdmin hamaAdmin;
-
- protected boolean closed = true;
-
- /**
- * Sets the job configuration
- *
- * @param conf configuration object
- * @throws MasterNotRunningException
- */
- public void setConfiguration(HamaConfiguration conf)
- throws MasterNotRunningException {
- this.config = conf;
- this.admin = new HBaseAdmin(config);
-
- hamaAdmin = new HamaAdminImpl(conf, admin);
- }
-
- /**
- * Create matrix space
- */
- protected void create() throws IOException {
- // It should run only when table doesn't exist.
- if (!admin.tableExists(matrixPath)) {
- this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
- this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
- this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
- this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK));
-
- LOG.info("Initializing the matrix storage.");
- this.admin.createTable(this.tableDesc);
- LOG.info("Create Matrix " + matrixPath);
-
- // connect to the table.
- table = new HTable(config, matrixPath);
- // Record the matrix type in METADATA_TYPE
- BatchUpdate update = new BatchUpdate(Constants.METADATA);
- update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
- .getSimpleName()));
-
- table.commit(update);
-
- // the new matrix's reference is 1.
- setReference(1);
- }
- }
-
- public HTable getHTable() {
- return this.table;
- }
-
- /** {@inheritDoc} */
- public int getRows() throws IOException {
- Cell rows = null;
- rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
- return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
- }
-
- /** {@inheritDoc} */
- public int getColumns() throws IOException {
- Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
- return BytesUtil.bytesToInt(columns.getValue());
- }
-
- /** {@inheritDoc} */
- public void set(int i, int j, double value) throws IOException {
- VectorUpdate update = new VectorUpdate(i);
- update.put(j, value);
- table.commit(update.getBatchUpdate());
- }
-
- /** {@inheritDoc} */
- public void add(int i, int j, double value) throws IOException {
- VectorUpdate update = new VectorUpdate(i);
- update.put(j, value + this.get(i, j));
- table.commit(update.getBatchUpdate());
- }
-
- /** {@inheritDoc} */
- public void setDimension(int rows, int columns) throws IOException {
- VectorUpdate update = new VectorUpdate(Constants.METADATA);
- update.put(Constants.METADATA_ROWS, rows);
- update.put(Constants.METADATA_COLUMNS, columns);
-
- table.commit(update.getBatchUpdate());
- }
-
- public String getRowLabel(int row) throws IOException {
- Cell rows = null;
- rows = table.get(BytesUtil.getRowIndex(row), Bytes
- .toBytes(Constants.ATTRIBUTE + "string"));
-
- return (rows != null) ? Bytes.toString(rows.getValue()) : null;
- }
-
- public void setRowLabel(int row, String name) throws IOException {
- VectorUpdate update = new VectorUpdate(row);
- update.put(Constants.ATTRIBUTE + "string", name);
- table.commit(update.getBatchUpdate());
- }
-
- public String getColumnLabel(int column) throws IOException {
- Cell rows = null;
- rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
- return (rows != null) ? Bytes.toString(rows.getValue()) : null;
- }
-
- public void setColumnLabel(int column, String name) throws IOException {
- VectorUpdate update = new VectorUpdate(Constants.CINDEX);
- update.put(column, name);
- table.commit(update.getBatchUpdate());
- }
-
- /** {@inheritDoc} */
- public String getPath() {
- return matrixPath;
- }
-
- protected void setReference(int reference) throws IOException {
- BatchUpdate update = new BatchUpdate(Constants.METADATA);
- update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
- table.commit(update);
- }
-
- protected int incrementAndGetRef() throws IOException {
- int reference = 1;
- Cell rows = null;
- rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
- if (rows != null) {
- reference = Bytes.toInt(rows.getValue());
- reference++;
- }
- setReference(reference);
- return reference;
- }
-
- protected int decrementAndGetRef() throws IOException {
- int reference = 0;
- Cell rows = null;
- rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
- if (rows != null) {
- reference = Bytes.toInt(rows.getValue());
- if (reference > 0) // reference==0, we need not to decrement it.
- reference--;
- }
- setReference(reference);
- return reference;
- }
-
- protected boolean hasAliaseName() throws IOException {
- Cell rows = null;
- rows = table.get(Constants.METADATA, Constants.ALIASENAME);
- return (rows != null) ? true : false;
- }
-
- public void close() throws IOException {
- if (closed) // have been closed
- return;
- int reference = decrementAndGetRef();
- if (reference <= 0) { // no reference again.
- if (!hasAliaseName()) { // the table has not been aliased, we delete the
- // table.
- if (admin.isTableEnabled(matrixPath)) {
- admin.disableTable(matrixPath);
- admin.deleteTable(matrixPath);
- }
- }
- }
- closed = true;
- }
-
- public boolean save(String aliasename) throws IOException {
- // mark & update the aliase name in "alise:name" meta column.
- // ! one matrix has only one aliasename now.
- BatchUpdate update = new BatchUpdate(Constants.METADATA);
- update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
- table.commit(update);
- return hamaAdmin.save(this, aliasename);
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Methods of the matrix classes
+ */
+public abstract class AbstractMatrix implements Matrix {
+ static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
+
+ protected HamaConfiguration config;
+ protected HBaseAdmin admin;
+ // a matrix just need a table path to point to the table which stores matrix.
+ // let HamaAdmin manage Matrix Name space.
+ protected String matrixPath;
+ protected HTable table;
+ protected HTableDescriptor tableDesc;
+ protected HamaAdmin hamaAdmin;
+
+ protected boolean closed = true;
+
+ /**
+ * Sets the job configuration
+ *
+ * @param conf configuration object
+ * @throws MasterNotRunningException
+ */
+ public void setConfiguration(HamaConfiguration conf)
+ throws MasterNotRunningException {
+ this.config = conf;
+ this.admin = new HBaseAdmin(config);
+
+ hamaAdmin = new HamaAdminImpl(conf, admin);
+ }
+
+ /**
+ * Create matrix space
+ */
+ protected void create() throws IOException {
+ // It should run only when table doesn't exist.
+ if (!admin.tableExists(matrixPath)) {
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
+ this.tableDesc.addFamily(new HColumnDescriptor(
+ Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE,
+ false, false, Integer.MAX_VALUE, HConstants.FOREVER, false
+ ));
+
+ LOG.info("Initializing the matrix storage.");
+ this.admin.createTable(this.tableDesc);
+ LOG.info("Create Matrix " + matrixPath);
+
+ // connect to the table.
+ table = new HTable(config, matrixPath);
+ // Record the matrix type in METADATA_TYPE
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
+ .getSimpleName()));
+
+ table.commit(update);
+
+ // the new matrix's reference is 1.
+ setReference(1);
+ }
+ }
+
+ public HTable getHTable() {
+ return this.table;
+ }
+
+ /** {@inheritDoc} */
+ public int getRows() throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
+ return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+ }
+
+ /** {@inheritDoc} */
+ public int getColumns() throws IOException {
+ Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
+ return BytesUtil.bytesToInt(columns.getValue());
+ }
+
+ /** {@inheritDoc} */
+ public void set(int i, int j, double value) throws IOException {
+ VectorUpdate update = new VectorUpdate(i);
+ update.put(j, value);
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public void add(int i, int j, double value) throws IOException {
+ VectorUpdate update = new VectorUpdate(i);
+ update.put(j, value + this.get(i, j));
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public void setDimension(int rows, int columns) throws IOException {
+ VectorUpdate update = new VectorUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_ROWS, rows);
+ update.put(Constants.METADATA_COLUMNS, columns);
+
+ table.commit(update.getBatchUpdate());
+ }
+
+ public String getRowLabel(int row) throws IOException {
+ Cell rows = null;
+ rows = table.get(BytesUtil.getRowIndex(row), Bytes
+ .toBytes(Constants.ATTRIBUTE + "string"));
+
+ return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+ }
+
+ public void setRowLabel(int row, String name) throws IOException {
+ VectorUpdate update = new VectorUpdate(row);
+ update.put(Constants.ATTRIBUTE + "string", name);
+ table.commit(update.getBatchUpdate());
+ }
+
+ public String getColumnLabel(int column) throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
+ return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+ }
+
+ public void setColumnLabel(int column, String name) throws IOException {
+ VectorUpdate update = new VectorUpdate(Constants.CINDEX);
+ update.put(column, name);
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public String getPath() {
+ return matrixPath;
+ }
+
+ protected void setReference(int reference) throws IOException {
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
+ table.commit(update);
+ }
+
+ protected int incrementAndGetRef() throws IOException {
+ int reference = 1;
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+ if (rows != null) {
+ reference = Bytes.toInt(rows.getValue());
+ reference++;
+ }
+ setReference(reference);
+ return reference;
+ }
+
+ protected int decrementAndGetRef() throws IOException {
+ int reference = 0;
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+ if (rows != null) {
+ reference = Bytes.toInt(rows.getValue());
+ if (reference > 0) // reference==0, we need not to decrement it.
+ reference--;
+ }
+ setReference(reference);
+ return reference;
+ }
+
+ protected boolean hasAliaseName() throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.ALIASENAME);
+ return (rows != null) ? true : false;
+ }
+
+ public void close() throws IOException {
+ if (closed) // have been closed
+ return;
+ int reference = decrementAndGetRef();
+ if (reference <= 0) { // no reference again.
+ if (!hasAliaseName()) { // the table has not been aliased, we delete the
+ // table.
+ if (admin.isTableEnabled(matrixPath)) {
+ admin.disableTable(matrixPath);
+ admin.deleteTable(matrixPath);
+ }
+ }
+ }
+ closed = true;
+ }
+
+ public boolean save(String aliasename) throws IOException {
+ // mark & update the aliase name in "alise:name" meta column.
+ // ! one matrix has only one aliasename now.
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
+ table.commit(update);
+ return hamaAdmin.save(this, aliasename);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Dec 30 03:52:47 2008
@@ -87,15 +87,8 @@
/** default try times to generate a suitable tablename */
public static final int DEFAULT_TRY_TIMES = 10000000;
- /**
- * block position column to store
- * {@link org.apache.hama.io.BlockPosition} object
- */
- public static final String BLOCK_POSITION = "attribute:blockPosition";
-
/** block data column */
public static final String BLOCK = "block:";
- /** block size */
- public static final String BLOCK_SIZE = "attribute:blockSize";
+ public static final String BLOCK_PATH = "attribute:blockPath";
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Tue Dec 30 03:52:47 2008
@@ -49,7 +49,6 @@
import org.apache.hama.algebra.SIMDMultiplyMap;
import org.apache.hama.algebra.SIMDMultiplyReduce;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
import org.apache.hama.io.MapWritable;
@@ -395,7 +394,8 @@
jobConf.setNumReduceTasks(config.getNumReduceTasks());
if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
- BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
+ BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(),
+ ((DenseMatrix) B).getBlockedMatrixPath(),
BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
jobConf);
BlockCyclicMultiplyReduce.initJob(result.getPath(),
@@ -477,7 +477,7 @@
}
public boolean isBlocked() throws IOException {
- return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
+ return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false
: true;
}
@@ -486,64 +486,12 @@
Bytes.toBytes(Constants.BLOCK)).getValue());
}
- /**
- * @return the size of block
- * @throws IOException
- */
- public int getBlockSize() throws IOException {
- return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
- Constants.BLOCK_SIZE).getValue()) : -1;
- }
-
- protected void setBlockSize(int blockNum) throws IOException {
- BatchUpdate update = new BatchUpdate(Constants.METADATA);
- update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
- table.commit(update);
- }
-
public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
table.commit(update);
}
-
- protected void setBlockPosition(int blockNum) throws IOException {
- int block_row_size = this.getRows() / blockNum;
- int block_column_size = this.getColumns() / blockNum;
-
- int startRow, endRow, startColumn, endColumn;
- int i = 0, j = 0;
- do {
- startRow = i * block_row_size;
- endRow = (startRow + block_row_size) - 1;
- if (endRow >= this.getRows())
- endRow = this.getRows() - 1;
-
- j = 0;
- do {
- startColumn = j * block_column_size;
- endColumn = (startColumn + block_column_size) - 1;
- if (endColumn >= this.getColumns())
- endColumn = this.getColumns() - 1;
-
- BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
- update.put(Constants.BLOCK_POSITION, new BlockPosition(startRow,
- endRow, startColumn, endColumn).getBytes());
- table.commit(update);
-
- j++;
- } while (endColumn < (this.getColumns() - 1));
-
- i++;
- } while (endRow < (this.getRows() - 1));
- }
-
- protected BlockPosition getBlockPosition(int i, int j) throws IOException {
- byte[] rs = table.get(new BlockID(i, j).getBytes(),
- Bytes.toBytes(Constants.BLOCK_POSITION)).getValue();
- return new BlockPosition(rs);
- }
-
+
/**
* Using a map/reduce job to block a dense matrix.
*
@@ -551,51 +499,34 @@
* @throws IOException
*/
public void blocking_mapred(int blockNum) throws IOException {
- this.checkBlockNum(blockNum);
+ double blocks = Math.pow(blockNum, 0.5);
+ if (!String.valueOf(blocks).endsWith(".0"))
+ throw new IOException("can't divide.");
+ int block_size = (int) blocks;
+ Matrix blockedMatrix = new DenseMatrix(config);
+ blockedMatrix.setDimension(block_size, block_size);
+ this.setBlockedMatrixPath(blockedMatrix.getPath());
+
JobConf jobConf = new JobConf(config);
jobConf.setJobName("Blocking MR job" + getPath());
jobConf.setNumMapTasks(config.getNumMapTasks());
jobConf.setNumReduceTasks(config.getNumReduceTasks());
- BlockingMapRed.initJob(getPath(), jobConf);
-
+ BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(),
+ block_size, this.getRows(), this.getColumns(), jobConf);
JobManager.execute(jobConf);
}
- /**
- * Using a scanner to block a dense matrix. If the matrix is large, use the
- * blocking_mapred()
- *
- * @param blockNum
- * @throws IOException
- */
- public void blocking(int blockNum) throws IOException {
- this.checkBlockNum(blockNum);
-
- String[] columns = new String[] { Constants.BLOCK_POSITION };
- Scanner scan = table.getScanner(columns);
-
- for (RowResult row : scan) {
- BlockID bID = new BlockID(row.getRow());
- BlockPosition pos = new BlockPosition(row.get(Constants.BLOCK_POSITION)
- .getValue());
-
- setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), pos
- .getEndRow(), pos.getStartColumn(), pos.getEndColumn()));
- }
+ public String getBlockedMatrixPath() throws IOException {
+ return Bytes.toString(table.get(Constants.METADATA,
+ Constants.BLOCK_PATH).getValue());
}
- private void checkBlockNum(int blockNum) throws IOException {
- double blocks = Math.pow(blockNum, 0.5);
- // TODO: Check also it is validation with matrix.
- if (!String.valueOf(blocks).endsWith(".0"))
- throw new IOException("can't divide.");
-
- int block_size = (int) blocks;
- setBlockPosition(block_size);
- setBlockSize(block_size);
- LOG.info("Create " + block_size + " * " + block_size + " blocked matrix");
+ protected void setBlockedMatrixPath(String path) throws IOException {
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
+ table.commit(update);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,199 +1,213 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-/**
- * A sub matrix is a matrix formed by selecting certain rows and columns from a
- * bigger matrix. This is a in-memory operation only.
- */
-public class SubMatrix implements java.io.Serializable {
- private static final long serialVersionUID = 3897536498367921547L;
- static final Logger LOG = Logger.getLogger(SubMatrix.class);
- private double[][] matrix;
-
- /**
- * Constructor
- *
- * @param i the size of rows
- * @param j the size of columns
- */
- public SubMatrix(int i, int j) {
- this.matrix = new double[i][j];
- }
-
- /**
- * Constructor
- *
- * @param c a two dimensional double array
- */
- public SubMatrix(double[][] c) {
- double[][] matrix = c;
- this.matrix = matrix;
- }
-
- public SubMatrix(byte[] matrix) throws IOException {
- ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
- ObjectInputStream oos = new ObjectInputStream(bos);
- Object obj = null;
- try {
- obj = oos.readObject();
- this.matrix = ((SubMatrix)obj).getDoubleArray();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- oos.close();
- bos.close();
- }
-
- /**
- * Sets the value
- *
- * @param row
- * @param column
- * @param value
- */
- public void set(int row, int column, double value) {
- matrix[row][column] = value;
- }
-
- /**
- * Sets the value
- *
- * @param row
- * @param column
- * @param value
- */
- public void set(int row, int column, byte[] value) {
- matrix[row][column] = BytesUtil.bytesToDouble(value);
- }
-
- /**
- * Gets the value
- *
- * @param i
- * @param j
- * @return the value of submatrix(i, j)
- */
- public double get(int i, int j) {
- return matrix[i][j];
- }
-
- public void add(int row, int column, double value) {
- matrix[row][column] = matrix[row][column] + value;
- }
-
- /**
- * c = a+b
- *
- * @param b
- * @return c
- */
- public SubMatrix add(SubMatrix b) {
- SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
-
- for (int i = 0; i < this.getRows(); i++) {
- for (int j = 0; j < this.getColumns(); j++) {
- c.set(i, j, (this.get(i, j) + b.get(i, j)));
- }
- }
-
- return c;
- }
-
- /**
- * c = a*b
- *
- * @param b
- * @return c
- */
- public SubMatrix mult(SubMatrix b) {
- SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
-
- for (int i = 0; i < this.getRows(); i++) {
- for (int j = 0; j < b.getColumns(); j++) {
- for (int k = 0; k < this.getColumns(); k++) {
- c.add(i, j, this.get(i, k) * b.get(k, j));
- }
- }
- }
-
- return c;
- }
-
- /**
- * Gets the number of rows
- *
- * @return the number of rows
- */
- public int getRows() {
- return this.matrix.length;
- }
-
- /**
- * Gets the number of columns
- *
- * @return the number of columns
- */
- public int getColumns() {
- return this.matrix[0].length;
- }
-
- /**
- * Close
- */
- public void close() {
- matrix = null;
- }
-
- /**
- * @return the 2d double array
- */
- public double[][] getDoubleArray() {
- double[][] result = matrix;
- return result;
- }
-
- /**
- * Gets the bytes of the sub matrix
- *
- * @return the bytes of the sub matrix
- * @throws IOException
- */
- public byte[] getBytes() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(this);
- oos.flush();
- oos.close();
- bos.close();
- byte[] data = bos.toByteArray();
- return data;
- }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * A sub matrix is a matrix formed by selecting certain rows and columns from a
+ * bigger matrix. This is a in-memory operation only.
+ */
+public class SubMatrix implements java.io.Serializable {
+ private static final long serialVersionUID = 3897536498367921547L;
+ static final Logger LOG = Logger.getLogger(SubMatrix.class);
+ private double[][] matrix;
+
+ /**
+ * Constructor
+ *
+ * @param i the size of rows
+ * @param j the size of columns
+ */
+ public SubMatrix(int i, int j) {
+ this.matrix = new double[i][j];
+ }
+
+ /**
+ * Constructor
+ *
+ * @param c a two dimensional double array
+ */
+ public SubMatrix(double[][] c) {
+ double[][] matrix = c;
+ this.matrix = matrix;
+ }
+
+ public SubMatrix(byte[] matrix) throws IOException {
+ ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
+ DataInputStream dis = new DataInputStream(bos);
+
+ int rows = dis.readInt();
+ int columns = dis.readInt();
+ this.matrix = new double[rows][columns];
+
+ for(int i = 0; i < rows; i++) {
+ for(int j = 0; j < columns; j++) {
+ this.matrix[i][j] = dis.readDouble();
+ }
+ }
+
+ dis.close();
+ bos.close();
+ }
+
+ /**
+ * Sets the value
+ *
+ * @param row
+ * @param column
+ * @param value
+ */
+ public void set(int row, int column, double value) {
+ matrix[row][column] = value;
+ }
+
+ /**
+ * Sets the value
+ *
+ * @param row
+ * @param column
+ * @param value
+ */
+ public void set(int row, int column, byte[] value) {
+ matrix[row][column] = BytesUtil.bytesToDouble(value);
+ }
+
+ /**
+ * Gets the value
+ *
+ * @param i
+ * @param j
+ * @return the value of submatrix(i, j)
+ */
+ public double get(int i, int j) {
+ return matrix[i][j];
+ }
+
+ public void add(int row, int column, double value) {
+ matrix[row][column] = matrix[row][column] + value;
+ }
+
+ /**
+ * c = a+b
+ *
+ * @param b
+ * @return c
+ */
+ public SubMatrix add(SubMatrix b) {
+ SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
+
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < this.getColumns(); j++) {
+ c.set(i, j, (this.get(i, j) + b.get(i, j)));
+ }
+ }
+
+ return c;
+ }
+
+ /**
+ * c = a*b
+ *
+ * @param b
+ * @return c
+ */
+ public SubMatrix mult(SubMatrix b) {
+ SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
+
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < b.getColumns(); j++) {
+ for (int k = 0; k < this.getColumns(); k++) {
+ c.add(i, j, this.get(i, k) * b.get(k, j));
+ }
+ }
+ }
+
+ return c;
+ }
+
+ /**
+ * Gets the number of rows
+ *
+ * @return the number of rows
+ */
+ public int getRows() {
+ return this.matrix.length;
+ }
+
+ /**
+ * Gets the number of columns
+ *
+ * @return the number of columns
+ */
+ public int getColumns() {
+ return this.matrix[0].length;
+ }
+
+ /**
+ * Close
+ */
+ public void close() {
+ matrix = null;
+ }
+
+ /**
+ * @return the 2d double array
+ */
+ public double[][] getDoubleArray() {
+ double[][] result = matrix;
+ return result;
+ }
+
+ /**
+ * Gets the bytes of the sub matrix
+ *
+ * @return the bytes of the sub matrix
+ * @throws IOException
+ */
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ dos.writeInt(this.getRows());
+ dos.writeInt(this.getColumns());
+
+ for(int i = 0; i < this.getRows(); i++) {
+ for(int j = 0; j < this.getColumns(); j++) {
+ dos.writeDouble(this.get(i, j));
+ }
+ }
+
+ byte[] data = bos.toByteArray();
+ dos.close();
+ bos.close();
+ return data;
+ }
+
+}
+
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Tue Dec 30 03:52:47 2008
@@ -21,6 +21,10 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
@@ -69,13 +73,24 @@
public void map(BlockID key, BlockWritable value,
OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
- int blockSize = matrix_b.getBlockSize();
+ int blockSize = matrix_b.getRows();
SubMatrix a = value.get();
-
- for (int j = 0; j < blockSize; j++) {
- SubMatrix b = matrix_b.getBlock(key.getColumn(), j);
+ HTable table = matrix_b.getHTable();
+
+ // startKey : new BlockID(key.getColumn(), 0).toString()
+ // endKey : new BlockID(key.getColumn(), blockSize+1).toString()
+ Scanner scan = table.getScanner(new byte[][] { Bytes
+ .toBytes(Constants.BLOCK) },
+ new BlockID(key.getColumn(), 0).getBytes(), new BlockID(
+ key.getColumn(), blockSize + 1).getBytes());
+
+ for (RowResult row : scan) {
+ BlockID bid = new BlockID(row.getRow());
+ SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
SubMatrix c = a.mult(b);
- output.collect(new BlockID(key.getRow(), j), new BlockWritable(c));
+ output.collect(new BlockID(key.getRow(), bid.getColumn()),
+ new BlockWritable(c));
}
+ scan.close();
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Tue Dec 30 03:52:47 2008
@@ -19,20 +19,19 @@
*/
package org.apache.hama.io;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
/** A WritableComparable for BlockIDs. */
@SuppressWarnings("unchecked")
-public class BlockID implements WritableComparable, java.io.Serializable {
- private static final long serialVersionUID = 6434651179475226613L;
+public class BlockID implements WritableComparable {
+ static final Logger LOG = Logger.getLogger(BlockID.class);
+ public static final int PAD_SIZE = 15;
private int row;
private int column;
@@ -44,18 +43,28 @@
}
public BlockID(byte[] bytes) throws IOException {
- ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
- ObjectInputStream oos = new ObjectInputStream(bos);
- Object obj = null;
+ String rKey = Bytes.toString(bytes);
+ String keys[] = null;
+ if (rKey.substring(0, 8).equals("00000000")) {
+ int i = 8;
+ while (rKey.charAt(i) == '0') {
+ i++;
+ }
+ keys = rKey.substring(i, rKey.length()).split("[,]");
+ } else {
+ int i = 0;
+ while (rKey.charAt(i) == '0') {
+ i++;
+ }
+ keys = rKey.substring(i, rKey.length()).split("[,]");
+ }
+
try {
- obj = oos.readObject();
- this.row = ((BlockID)obj).getRow();
- this.column = ((BlockID)obj).getColumn();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
+ this.row = Integer.parseInt(keys[1]);
+ this.column = Integer.parseInt(keys[2]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e);
}
- oos.close();
- bos.close();
}
public void set(int row, int column) {
@@ -72,20 +81,27 @@
}
public void readFields(DataInput in) throws IOException {
- column = in.readInt();
- row = in.readInt();
+ BlockID value = new BlockID(Bytes.readByteArray(in));
+ this.row = value.getRow();
+ this.column = value.getColumn();
}
public void write(DataOutput out) throws IOException {
- out.writeInt(column);
- out.writeInt(row);
+ Bytes.writeByteArray(out, Bytes.toBytes(this.toString()));
}
/**
* Make BlockID's string representation be same format.
*/
public String toString() {
- return row + "," + column;
+ int zeros = PAD_SIZE - String.valueOf(row).length()
+ - String.valueOf(column).length();
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < zeros; ++i) {
+ buf.append("0");
+ }
+
+ return buf.toString() + "," + row + "," + column;
}
@Override
@@ -110,19 +126,14 @@
@Override
public boolean equals(Object o) {
- if(o == null) return false;
- if(!(o instanceof BlockID)) return false;
+ if (o == null)
+ return false;
+ if (!(o instanceof BlockID))
+ return false;
return compareTo(o) == 0;
}
- public byte[] getBytes() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(this);
- oos.flush();
- oos.close();
- bos.close();
- byte[] data = bos.toByteArray();
- return data;
+ public byte[] getBytes() {
+ return Bytes.toBytes(this.toString());
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Tue Dec 30 03:52:47 2008
@@ -1,56 +1,77 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.SubMatrix;
-
-public class BlockWritable implements Writable {
- public SubMatrix matrix;
-
- public BlockWritable() {
- this.matrix = new SubMatrix(0, 0);
- }
-
- public BlockWritable(SubMatrix c) {
- this.matrix = c;
- }
-
- public BlockWritable(byte[] bytes) throws IOException {
- this.matrix = new SubMatrix(bytes);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.matrix = new SubMatrix(Bytes.readByteArray(in));
- }
-
- public void write(DataOutput out) throws IOException {
- Bytes.writeByteArray(out, this.matrix.getBytes());
- }
-
- public SubMatrix get() {
- return this.matrix;
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.SubMatrix;
+
+public class BlockWritable implements Writable {
+ private SubMatrix matrix;
+
+ public BlockWritable() {
+ this.matrix = new SubMatrix(0, 0);
+ }
+
+ public BlockWritable(SubMatrix c) {
+ this.matrix = c;
+ }
+
+ public BlockWritable(byte[] bytes) throws IOException {
+ this.matrix = new SubMatrix(bytes);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+
+ int rows = in.readInt();
+ int columns = in.readInt();
+ this.matrix = new SubMatrix(rows, columns);
+
+ for(int i = 0; i < rows; i++) {
+ for(int j = 0; j < columns; j++) {
+ this.matrix.set(i, j, in.readDouble());
+ }
+ }
+
+ //this.matrix = new SubMatrix(Bytes.readByteArray(in));
+ }
+
+ public void write(DataOutput out) throws IOException {
+ //Bytes.writeByteArray(out, this.matrix.getBytes());
+
+ out.writeInt(this.matrix.getRows());
+ out.writeInt(this.matrix.getColumns());
+
+ for(int i = 0; i < this.matrix.getRows(); i++) {
+ for(int j = 0; j < this.matrix.getColumns(); j++) {
+ out.writeDouble(this.matrix.get(i, j));
+ }
+ }
+ }
+
+ public SubMatrix get() {
+ return this.matrix;
+ }
+}
+
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Tue Dec 30 03:52:47 2008
@@ -1,180 +1,197 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.DenseVector;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.VectorWritable;
-
-/**
- * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
- */
-public class BlockingMapRed {
-
- static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
- /** Parameter of the path of the matrix to be blocked * */
- public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
-
- /**
- * Initialize a job to blocking a table
- *
- * @param matrixPath
- * @param job
- */
- public static void initJob(String matrixPath, JobConf job) {
- job.setMapperClass(BlockingMapper.class);
- job.setReducerClass(BlockingReducer.class);
- FileInputFormat.addInputPaths(job, matrixPath);
-
- job.setInputFormat(VectorInputFormat.class);
- job.setMapOutputKeyClass(BlockID.class);
- job.setMapOutputValueClass(VectorWritable.class);
- job.setOutputFormat(NullOutputFormat.class);
-
- job.set(BLOCKING_MATRIX, matrixPath);
- job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
- }
-
- /**
- * Abstract Blocking Map/Reduce Class to configure the job.
- */
- public static abstract class BlockingMapRedBase extends MapReduceBase {
-
- protected DenseMatrix matrix;
- protected int mBlockNum;
- protected int mBlockRowSize;
- protected int mBlockColSize;
-
- protected int mRows;
- protected int mColumns;
-
- @Override
- public void configure(JobConf job) {
- try {
- matrix = new DenseMatrix(new HamaConfiguration(), job.get(
- BLOCKING_MATRIX, ""));
- mBlockNum = matrix.getBlockSize();
- mBlockRowSize = matrix.getRows() / mBlockNum;
- mBlockColSize = matrix.getColumns() / mBlockNum;
-
- mRows = matrix.getRows();
- mColumns = matrix.getColumns();
- } catch (IOException e) {
- LOG.warn("Load matrix_blocking failed : " + e.getMessage());
- }
- }
-
- }
-
- /**
- * Mapper Class
- */
- public static class BlockingMapper extends BlockingMapRedBase implements
- Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
-
- @Override
- public void map(IntWritable key, VectorWritable value,
- OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
- throws IOException {
- int startColumn;
- int endColumn;
- int blkRow = key.get() / mBlockRowSize;
- DenseVector dv = value.getDenseVector();
-
- int i = 0;
- do {
- startColumn = i * mBlockColSize;
- endColumn = startColumn + mBlockColSize - 1;
- if(endColumn >= mColumns) // the last sub vector
- endColumn = mColumns - 1;
- output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
- dv.subVector(startColumn, endColumn)));
-
- i++;
- } while(endColumn < (mColumns-1));
- }
-
- }
-
- /**
- * Reducer Class
- */
- public static class BlockingReducer extends BlockingMapRedBase implements
- Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
-
- @Override
- public void reduce(BlockID key, Iterator<VectorWritable> values,
- OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
- throws IOException {
- // Note: all the sub-vectors are grouped by {@link
- // org.apache.hama.io.BlockID}
-
- // the block's base offset in the original matrix
- int colBase = key.getColumn() * mBlockColSize;
- int rowBase = key.getRow() * mBlockRowSize;
-
- // the block's size : rows & columns
- int smRows = mBlockRowSize;
- if((rowBase + mBlockRowSize - 1) >= mRows)
- smRows = mRows - rowBase;
- int smCols = mBlockColSize;
- if((colBase + mBlockColSize - 1) >= mColumns)
- smCols = mColumns - colBase;
-
- // construct the matrix
- SubMatrix subMatrix = new SubMatrix(smRows, smCols);
-
- // i, j is the current offset in the sub-matrix
- int i = 0, j = 0;
- while (values.hasNext()) {
- VectorWritable vw = values.next();
- // check the size is suitable
- if (vw.size() != smCols)
- throw new IOException("Block Column Size dismatched.");
- i = vw.row - rowBase;
- if (i >= smRows || i < 0)
- throw new IOException("Block Row Size dismatched.");
-
- // put the subVector to the subMatrix
- for (j = 0; j < smCols; j++) {
- subMatrix.set(i, j, vw.get(colBase + j));
- }
- }
-
- matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
+ */
+public class BlockingMapRed {
+
+ static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
+ /** Parameter of the path of the matrix to be blocked * */
+ public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
+ public static final String BLOCKED_MATRIX = "hama.blocked.matrix";
+ public static final String BLOCK_SIZE = "hama.blocking.size";
+ public static final String ROWS = "hama.blocking.rows";
+ public static final String COLUMNS = "hama.blocking.columns";
+
+ /**
+ * Initialize a job to blocking a table
+ *
+ * @param matrixPath
+ * @param string
+ * @param j
+ * @param i
+ * @param block_size
+ * @param job
+ */
+ public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) {
+ job.setMapperClass(BlockingMapper.class);
+ job.setReducerClass(BlockingReducer.class);
+ FileInputFormat.addInputPaths(job, matrixPath);
+
+ job.setInputFormat(VectorInputFormat.class);
+ job.setMapOutputKeyClass(BlockID.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.set(BLOCKING_MATRIX, matrixPath);
+ job.set(BLOCKED_MATRIX, string);
+ job.set(BLOCK_SIZE, String.valueOf(block_size));
+ job.set(ROWS, String.valueOf(i));
+ job.set(COLUMNS, String.valueOf(j));
+
+ job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ }
+
+ /**
+ * Abstract Blocking Map/Reduce Class to configure the job.
+ */
+ public static abstract class BlockingMapRedBase extends MapReduceBase {
+
+ protected DenseMatrix matrix;
+ protected DenseMatrix blockedMatrix;
+ protected int mBlockNum;
+ protected int mBlockRowSize;
+ protected int mBlockColSize;
+
+ protected int mRows;
+ protected int mColumns;
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ matrix = new DenseMatrix(new HamaConfiguration(), job.get(
+ BLOCKING_MATRIX, ""));
+ blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get(
+ BLOCKED_MATRIX, ""));
+
+ mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(job.get(ROWS, ""));
+ mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+ } catch (IOException e) {
+ LOG.warn("Load matrix_blocking failed : " + e.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * Mapper Class
+ */
+ public static class BlockingMapper extends BlockingMapRedBase implements
+ Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
+
+ @Override
+ public void map(IntWritable key, VectorWritable value,
+ OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+ throws IOException {
+ int startColumn;
+ int endColumn;
+ int blkRow = key.get() / mBlockRowSize;
+ DenseVector dv = value.getDenseVector();
+
+ int i = 0;
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if(endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+ dv.subVector(startColumn, endColumn)));
+
+ i++;
+ } while(endColumn < (mColumns-1));
+ }
+
+ }
+
+ /**
+ * Reducer Class
+ */
+ public static class BlockingReducer extends BlockingMapRedBase implements
+ Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
+
+ @Override
+ public void reduce(BlockID key, Iterator<VectorWritable> values,
+ OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+ throws IOException {
+ // Note: all the sub-vectors are grouped by {@link
+ // org.apache.hama.io.BlockID}
+
+ // the block's base offset in the original matrix
+ int colBase = key.getColumn() * mBlockColSize;
+ int rowBase = key.getRow() * mBlockRowSize;
+
+ // the block's size : rows & columns
+ int smRows = mBlockRowSize;
+ if((rowBase + mBlockRowSize - 1) >= mRows)
+ smRows = mRows - rowBase;
+ int smCols = mBlockColSize;
+ if((colBase + mBlockColSize - 1) >= mColumns)
+ smCols = mColumns - colBase;
+
+ // construct the matrix
+ SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+
+ // i, j is the current offset in the sub-matrix
+ int i = 0, j = 0;
+ while (values.hasNext()) {
+ VectorWritable vw = values.next();
+ // check the size is suitable
+ if (vw.size() != smCols)
+ throw new IOException("Block Column Size dismatched.");
+ i = vw.row - rowBase;
+ if (i >= smRows || i < 0)
+ throw new IOException("Block Row Size dismatched.");
+
+ // put the subVector to the subMatrix
+ for (j = 0; j < smCols; j++) {
+ subMatrix.set(i, j, vw.get(colBase + j));
+ }
+ }
+
+ blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
+ }
+ }
+
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,357 +1,324 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hama.io.BlockPosition;
-import org.apache.hama.io.DoubleEntry;
-import org.apache.log4j.Logger;
-
-/**
- * Matrix test
- */
-public class TestDenseMatrix extends TestCase {
- static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
- private static int SIZE = 10;
- private static Matrix m1;
- private static Matrix m2;
- private final static String aliase1 = "matrix_aliase_A";
- private final static String aliase2 = "matrix_aliase_B";
- private static HamaConfiguration conf;
- private static HBaseAdmin admin;
- private static HamaAdmin hamaAdmin;
-
- public static Test suite() {
- TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
- protected void setUp() throws Exception {
- HCluster hCluster = new HCluster();
- hCluster.setUp();
-
- conf = hCluster.getConf();
- admin = new HBaseAdmin(conf);
- hamaAdmin = new HamaAdminImpl(conf, admin);
-
- m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
- m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
- }
-
- protected void tearDown() {
- try {
- closeTest();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
- return setup;
- }
-
- public static void closeTest() throws IOException {
- m1.close();
- m2.close();
- }
-
- public void testEntryAdd() throws IOException {
- double origin = m1.get(1, 1);
- m1.add(1, 1, 0.5);
-
- assertEquals(m1.get(1, 1), origin + 0.5);
- }
-
- public void testBlocking() throws IOException, ClassNotFoundException {
- assertEquals(((DenseMatrix) m1).isBlocked(), false);
- ((DenseMatrix) m1).blocking(4);
- assertEquals(((DenseMatrix) m1).isBlocked(), true);
- BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
- double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(),
- pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
- .getDoubleArray();
- double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray();
- assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
- assertEquals(c.length, 5);
-
- for (int i = 0; i < b.length; i++) {
- for (int j = 0; j < b.length; j++) {
- assertEquals(b[i][j], c[i][j]);
- }
- }
- }
-
- /**
- * Map/Reduce Blocking Test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public void testMRBlocking() throws IOException, ClassNotFoundException {
- assertEquals(((DenseMatrix) m2).isBlocked(), false);
- ((DenseMatrix) m2).blocking_mapred(4);
- assertEquals(((DenseMatrix) m2).isBlocked(), true);
- BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
- double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(),
- pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
- .getDoubleArray();
- double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray();
- assertEquals(((DenseMatrix) m2).getBlockSize(), 2);
- assertEquals(c.length, 5);
-
- for (int i = 0; i < b.length; i++) {
- for (int j = 0; j < b.length; j++) {
- assertEquals(b[i][j], c[i][j]);
- }
- }
- }
-
- /**
- * Column vector test.
- *
- * @param rand
- * @throws IOException
- */
- public void testGetColumn() throws IOException {
- Vector v = m1.getColumn(0);
- Iterator<DoubleEntry> it = v.iterator();
- int x = 0;
- while (it.hasNext()) {
- assertEquals(m1.get(x, 0), it.next().getValue());
- x++;
- }
- }
-
- public void testGetSetAttribute() throws IOException {
- m1.setRowLabel(0, "row1");
- assertEquals(m1.getRowLabel(0), "row1");
- assertEquals(m1.getRowLabel(1), null);
-
- m1.setColumnLabel(0, "column1");
- assertEquals(m1.getColumnLabel(0), "column1");
- assertEquals(m1.getColumnLabel(1), null);
- }
-
- public void testSubMatrix() throws IOException {
- SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
- for (int i = 0; i < a.getRows(); i++) {
- for (int j = 0; j < a.getColumns(); j++) {
- assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
- }
- }
-
- SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
- SubMatrix c = a.mult(b);
-
- double[][] C = new double[3][3]; // A * B
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 3; j++) {
- for (int k = 0; k < 4; k++) {
- C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
- }
- }
- }
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 3; j++) {
- assertEquals(C[i][j], c.get(i, j));
- }
- }
- }
-
- /**
- * Test matrices addition
- *
- * @throws IOException
- */
- public void testMatrixAdd() throws IOException {
- Matrix result = m1.add(m2);
-
- assertEquals(result.getRows(), SIZE);
- assertEquals(result.getColumns(), SIZE);
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
- }
- }
- }
-
- /**
- * Test matrices multiplication
- *
- * @throws IOException
- */
- public void testMatrixMult() throws IOException {
- Matrix result = m1.mult(m2);
-
- assertEquals(result.getRows(), SIZE);
- assertEquals(result.getColumns(), SIZE);
-
- verifyMultResult(m1, m2, result);
- }
-
- public void testSetRow() throws IOException {
- Vector v = new DenseVector();
- double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
- for (int i = 0; i < SIZE; i++) {
- v.set(i, entries[i]);
- }
-
- m1.setRow(SIZE + 1, v);
- Iterator<DoubleEntry> it = m1.getRow(SIZE + 1).iterator();
-
- int i = 0;
- while (it.hasNext()) {
- assertEquals(entries[i], it.next().getValue());
- i++;
- }
- }
-
- public void testSetColumn() throws IOException {
- Vector v = new DenseVector();
- double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
- for (int i = 0; i < SIZE; i++) {
- v.set(i, entries[i]);
- }
-
- m1.setColumn(SIZE + 1, v);
- Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
-
- int i = 0;
- while (it.hasNext()) {
- assertEquals(entries[i], it.next().getValue());
- i++;
- }
- }
-
- public void testLoadSave() throws IOException {
- String path1 = m1.getPath();
- // save m1 to aliase1
- m1.save(aliase1);
- // load matrix m1 using aliase1
- DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
-
- for (int i = 0; i < loadTest.getRows(); i++) {
- for (int j = 0; j < loadTest.getColumns(); j++) {
- assertEquals(m1.get(i, j), loadTest.get(i, j));
- }
- }
-
- assertEquals(path1, loadTest.getPath());
- // close loadTest, it just disconnect to the table but didn't delete it.
- loadTest.close();
-
- // try to close m1 & load matrix m1 using aliase1 again.
- m1.close();
- DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
- assertEquals(path1, loadTest2.getPath());
- // remove aliase1
- // because loadTest2 connect the aliase1, so we just remove aliase entry
- // but didn't delete the table.
- hamaAdmin.delete(aliase1);
- assertEquals(true, admin.tableExists(path1));
- // close loadTest2, because it is the last one who reference table 'path1'
- // it will do the gc!
- loadTest2.close();
- assertEquals(false, admin.tableExists(path1));
-
- // if we try to load non-existed matrix using aliase name, it should fail.
- DenseMatrix loadTest3 = null;
- try {
- loadTest3 = new DenseMatrix(conf, aliase1, false);
- fail("Try to load a non-existed matrix should fail!");
- } catch (IOException e) {
-
- } finally {
- if (loadTest3 != null)
- loadTest3.close();
- }
- }
-
- public void testForceCreate() throws IOException {
- String path2 = m2.getPath();
- // save m2 to aliase2
- m2.save(aliase2);
- // load matrix m2 using aliase2
- DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
-
- for (int i = 0; i < loadTest.getRows(); i++) {
- for (int j = 0; j < loadTest.getColumns(); j++) {
- assertEquals(m2.get(i, j), loadTest.get(i, j));
- }
- }
-
- assertEquals(path2, loadTest.getPath());
-
- // force to create matrix loadTest2 using aliasename 'aliase2'
- DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
- String loadPath2 = loadTest2.getPath();
- assertFalse(path2.equals(loadPath2));
- assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
- assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
-
- // try to close m2 & loadTest, it table will be deleted finally
- m2.close();
- assertEquals(true, admin.tableExists(path2));
- loadTest.close();
- assertEquals(false, admin.tableExists(path2));
-
- // remove 'aliase2' & close loadTest2
- loadTest2.close();
- assertEquals(true, admin.tableExists(loadPath2));
- hamaAdmin.delete(aliase2);
- assertEquals(false, admin.tableExists(loadPath2));
- }
-
- /**
- * Verifying multiplication result
- *
- * @param m1
- * @param m2
- * @param result
- * @throws IOException
- */
- private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
- throws IOException {
- double[][] C = new double[SIZE][SIZE];
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- for (int k = 0; k < SIZE; k++) {
- C[i][k] += m1.get(i, j) * m2.get(j, k);
- }
- }
- }
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
- .valueOf(C[i][j]).substring(0, 14));
- }
- }
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.log4j.Logger;
+
+/**
+ * Matrix test
+ */
+public class TestDenseMatrix extends TestCase {
+ static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
+ private static int SIZE = 10;
+ private static Matrix m1;
+ private static Matrix m2;
+ private final static String aliase1 = "matrix_aliase_A";
+ private final static String aliase2 = "matrix_aliase_B";
+ private static HamaConfiguration conf;
+ private static HBaseAdmin admin;
+ private static HamaAdmin hamaAdmin;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
+ protected void setUp() throws Exception {
+ HCluster hCluster = new HCluster();
+ hCluster.setUp();
+
+ conf = hCluster.getConf();
+ admin = new HBaseAdmin(conf);
+ hamaAdmin = new HamaAdminImpl(conf, admin);
+
+ m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+ m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+ }
+
+ protected void tearDown() {
+ try {
+ closeTest();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ return setup;
+ }
+
+ public static void closeTest() throws IOException {
+ m1.close();
+ m2.close();
+ }
+
+ public void testEntryAdd() throws IOException {
+ double origin = m1.get(1, 1);
+ m1.add(1, 1, 0.5);
+
+ assertEquals(m1.get(1, 1), origin + 0.5);
+ }
+
+ /**
+ * Map/Reduce Blocking Test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public void testMRBlocking() throws IOException, ClassNotFoundException {
+ assertEquals(((DenseMatrix) m2).isBlocked(), false);
+ ((DenseMatrix) m2).blocking_mapred(4);
+ assertEquals(((DenseMatrix) m2).isBlocked(), true);
+ }
+
+ /**
+ * Column vector test.
+ *
+ * @param rand
+ * @throws IOException
+ */
+ public void testGetColumn() throws IOException {
+ Vector v = m1.getColumn(0);
+ Iterator<DoubleEntry> it = v.iterator();
+ int x = 0;
+ while (it.hasNext()) {
+ assertEquals(m1.get(x, 0), it.next().getValue());
+ x++;
+ }
+ }
+
+ public void testGetSetAttribute() throws IOException {
+ m1.setRowLabel(0, "row1");
+ assertEquals(m1.getRowLabel(0), "row1");
+ assertEquals(m1.getRowLabel(1), null);
+
+ m1.setColumnLabel(0, "column1");
+ assertEquals(m1.getColumnLabel(0), "column1");
+ assertEquals(m1.getColumnLabel(1), null);
+ }
+
+ public void testSubMatrix() throws IOException {
+ SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
+ for (int i = 0; i < a.getRows(); i++) {
+ for (int j = 0; j < a.getColumns(); j++) {
+ assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
+ }
+ }
+
+ SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
+ SubMatrix c = a.mult(b);
+
+ double[][] C = new double[3][3]; // A * B
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ for (int k = 0; k < 4; k++) {
+ C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
+ }
+ }
+ }
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ assertEquals(C[i][j], c.get(i, j));
+ }
+ }
+ }
+
+ /**
+ * Test matrices addition
+ *
+ * @throws IOException
+ */
+ public void testMatrixAdd() throws IOException {
+ Matrix result = m1.add(m2);
+
+ assertEquals(result.getRows(), SIZE);
+ assertEquals(result.getColumns(), SIZE);
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
+ }
+ }
+ }
+
+ /**
+ * Test matrices multiplication
+ *
+ * @throws IOException
+ */
+ public void testMatrixMult() throws IOException {
+ Matrix result = m1.mult(m2);
+
+ assertEquals(result.getRows(), SIZE);
+ assertEquals(result.getColumns(), SIZE);
+
+ verifyMultResult(m1, m2, result);
+ }
+
+ public void testSetRow() throws IOException {
+ Vector v = new DenseVector();
+ double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+ for (int i = 0; i < SIZE; i++) {
+ v.set(i, entries[i]);
+ }
+
+ m1.setRow(SIZE + 1, v);
+ Iterator<DoubleEntry> it = m1.getRow(SIZE + 1).iterator();
+
+ int i = 0;
+ while (it.hasNext()) {
+ assertEquals(entries[i], it.next().getValue());
+ i++;
+ }
+ }
+
+ public void testSetColumn() throws IOException {
+ Vector v = new DenseVector();
+ double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+ for (int i = 0; i < SIZE; i++) {
+ v.set(i, entries[i]);
+ }
+
+ m1.setColumn(SIZE + 1, v);
+ Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
+
+ int i = 0;
+ while (it.hasNext()) {
+ assertEquals(entries[i], it.next().getValue());
+ i++;
+ }
+ }
+
+ public void testLoadSave() throws IOException {
+ String path1 = m1.getPath();
+ // save m1 to aliase1
+ m1.save(aliase1);
+ // load matrix m1 using aliase1
+ DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
+
+ for (int i = 0; i < loadTest.getRows(); i++) {
+ for (int j = 0; j < loadTest.getColumns(); j++) {
+ assertEquals(m1.get(i, j), loadTest.get(i, j));
+ }
+ }
+
+ assertEquals(path1, loadTest.getPath());
+ // close loadTest, it just disconnect to the table but didn't delete it.
+ loadTest.close();
+
+ // try to close m1 & load matrix m1 using aliase1 again.
+ m1.close();
+ DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
+ assertEquals(path1, loadTest2.getPath());
+ // remove aliase1
+ // because loadTest2 connect the aliase1, so we just remove aliase entry
+ // but didn't delete the table.
+ hamaAdmin.delete(aliase1);
+ assertEquals(true, admin.tableExists(path1));
+ // close loadTest2, because it is the last one who reference table 'path1'
+ // it will do the gc!
+ loadTest2.close();
+ assertEquals(false, admin.tableExists(path1));
+
+ // if we try to load non-existed matrix using aliase name, it should fail.
+ DenseMatrix loadTest3 = null;
+ try {
+ loadTest3 = new DenseMatrix(conf, aliase1, false);
+ fail("Try to load a non-existed matrix should fail!");
+ } catch (IOException e) {
+
+ } finally {
+ if (loadTest3 != null)
+ loadTest3.close();
+ }
+ }
+
+ public void testForceCreate() throws IOException {
+ String path2 = m2.getPath();
+ // save m2 to aliase2
+ m2.save(aliase2);
+ // load matrix m2 using aliase2
+ DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
+
+ for (int i = 0; i < loadTest.getRows(); i++) {
+ for (int j = 0; j < loadTest.getColumns(); j++) {
+ assertEquals(m2.get(i, j), loadTest.get(i, j));
+ }
+ }
+
+ assertEquals(path2, loadTest.getPath());
+
+ // force to create matrix loadTest2 using aliasename 'aliase2'
+ DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
+ String loadPath2 = loadTest2.getPath();
+ assertFalse(path2.equals(loadPath2));
+ assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
+ assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
+
+ // try to close m2 & loadTest, it table will be deleted finally
+ m2.close();
+ assertEquals(true, admin.tableExists(path2));
+ loadTest.close();
+ assertEquals(false, admin.tableExists(path2));
+
+ // remove 'aliase2' & close loadTest2
+ loadTest2.close();
+ assertEquals(true, admin.tableExists(loadPath2));
+ hamaAdmin.delete(aliase2);
+ assertEquals(false, admin.tableExists(loadPath2));
+ }
+
+ /**
+ * Verifying multiplication result
+ *
+ * @param m1
+ * @param m2
+ * @param result
+ * @throws IOException
+ */
+ private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
+ throws IOException {
+ double[][] C = new double[SIZE][SIZE];
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ C[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
+ .valueOf(C[i][j]).substring(0, 14));
+ }
+ }
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Tue Dec 30 03:52:47 2008
@@ -21,20 +21,13 @@
import java.io.IOException;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hama.DenseMatrix;
import org.apache.hama.HCluster;
import org.apache.hama.Matrix;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockWritable;
import org.apache.log4j.Logger;
public class TestBlockMatrixMapReduce extends HCluster {
static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
- static Matrix c;
static final int SIZE = 32;
/** constructor */
@@ -50,41 +43,22 @@
((DenseMatrix) m1).blocking_mapred(16);
((DenseMatrix) m2).blocking_mapred(16);
- miniMRJob(m1.getPath(), m2.getPath());
+ Matrix c = m1.mult(m2);
- double[][] C = new double[SIZE][SIZE];
+ double[][] mem = new double[SIZE][SIZE];
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
for (int k = 0; k < SIZE; k++) {
- C[i][k] += m1.get(i, j) * m2.get(j, k);
+ mem[i][k] += m1.get(i, j) * m2.get(j, k);
}
}
}
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf(
+ assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf(
c.get(i, j)).substring(0, 5));
}
}
}
-
- private void miniMRJob(String string, String string2) throws IOException {
- c = new DenseMatrix(conf);
- String output = c.getPath();
-
- JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
- jobConf.setJobName("test MR job");
-
- BlockCyclicMultiplyMap.initJob(string, string2,
- BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
- jobConf);
- BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class,
- jobConf);
-
- jobConf.setNumMapTasks(2);
- jobConf.setNumReduceTasks(2);
-
- JobClient.runJob(jobConf);
- }
}