You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/12/30 12:52:48 UTC

svn commit: r730103 - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/io/ java/org/apache/hama/mapred/ test/org/apache/hama/ test/org/apache/hama/mapred/

Author: edwardyoon
Date: Tue Dec 30 03:52:47 2008
New Revision: 730103

URL: http://svn.apache.org/viewvc?rev=730103&view=rev
Log:
Fix OOME

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,228 +1,233 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Methods of the matrix classes
- */
-public abstract class AbstractMatrix implements Matrix {
-  static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
-
-  protected HamaConfiguration config;
-  protected HBaseAdmin admin;
-  // a matrix just need a table path to point to the table which stores matrix.
-  // let HamaAdmin manage Matrix Name space.
-  protected String matrixPath;
-  protected HTable table;
-  protected HTableDescriptor tableDesc;
-  protected HamaAdmin hamaAdmin;
-
-  protected boolean closed = true;
-
-  /**
-   * Sets the job configuration
-   * 
-   * @param conf configuration object
-   * @throws MasterNotRunningException
-   */
-  public void setConfiguration(HamaConfiguration conf)
-      throws MasterNotRunningException {
-    this.config = conf;
-    this.admin = new HBaseAdmin(config);
-
-    hamaAdmin = new HamaAdminImpl(conf, admin);
-  }
-
-  /**
-   * Create matrix space
-   */
-  protected void create() throws IOException {
-    // It should run only when table doesn't exist.
-    if (!admin.tableExists(matrixPath)) {
-      this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
-      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
-      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
-      this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK));
-
-      LOG.info("Initializing the matrix storage.");
-      this.admin.createTable(this.tableDesc);
-      LOG.info("Create Matrix " + matrixPath);
-
-      // connect to the table.
-      table = new HTable(config, matrixPath);
-      // Record the matrix type in METADATA_TYPE
-      BatchUpdate update = new BatchUpdate(Constants.METADATA);
-      update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
-          .getSimpleName()));
-
-      table.commit(update);
-
-      // the new matrix's reference is 1.
-      setReference(1);
-    }
-  }
-
-  public HTable getHTable() {
-    return this.table;
-  }
-  
-  /** {@inheritDoc} */
-  public int getRows() throws IOException {
-    Cell rows = null;
-    rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
-    return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
-  }
-
-  /** {@inheritDoc} */
-  public int getColumns() throws IOException {
-    Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
-    return BytesUtil.bytesToInt(columns.getValue());
-  }
-
-  /** {@inheritDoc} */
-  public void set(int i, int j, double value) throws IOException {
-    VectorUpdate update = new VectorUpdate(i);
-    update.put(j, value);
-    table.commit(update.getBatchUpdate());
-  }
-
-  /** {@inheritDoc} */
-  public void add(int i, int j, double value) throws IOException {
-    VectorUpdate update = new VectorUpdate(i);
-    update.put(j, value + this.get(i, j));
-    table.commit(update.getBatchUpdate());
-  }
-
-  /** {@inheritDoc} */
-  public void setDimension(int rows, int columns) throws IOException {
-    VectorUpdate update = new VectorUpdate(Constants.METADATA);
-    update.put(Constants.METADATA_ROWS, rows);
-    update.put(Constants.METADATA_COLUMNS, columns);
-
-    table.commit(update.getBatchUpdate());
-  }
-
-  public String getRowLabel(int row) throws IOException {
-    Cell rows = null;
-    rows = table.get(BytesUtil.getRowIndex(row), Bytes
-        .toBytes(Constants.ATTRIBUTE + "string"));
-
-    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
-  }
-
-  public void setRowLabel(int row, String name) throws IOException {
-    VectorUpdate update = new VectorUpdate(row);
-    update.put(Constants.ATTRIBUTE + "string", name);
-    table.commit(update.getBatchUpdate());
-  }
-
-  public String getColumnLabel(int column) throws IOException {
-    Cell rows = null;
-    rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
-    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
-  }
-
-  public void setColumnLabel(int column, String name) throws IOException {
-    VectorUpdate update = new VectorUpdate(Constants.CINDEX);
-    update.put(column, name);
-    table.commit(update.getBatchUpdate());
-  }
-
-  /** {@inheritDoc} */
-  public String getPath() {
-    return matrixPath;
-  }
-
-  protected void setReference(int reference) throws IOException {
-    BatchUpdate update = new BatchUpdate(Constants.METADATA);
-    update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
-    table.commit(update);
-  }
-
-  protected int incrementAndGetRef() throws IOException {
-    int reference = 1;
-    Cell rows = null;
-    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
-    if (rows != null) {
-      reference = Bytes.toInt(rows.getValue());
-      reference++;
-    }
-    setReference(reference);
-    return reference;
-  }
-
-  protected int decrementAndGetRef() throws IOException {
-    int reference = 0;
-    Cell rows = null;
-    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
-    if (rows != null) {
-      reference = Bytes.toInt(rows.getValue());
-      if (reference > 0) // reference==0, we need not to decrement it.
-        reference--;
-    }
-    setReference(reference);
-    return reference;
-  }
-
-  protected boolean hasAliaseName() throws IOException {
-    Cell rows = null;
-    rows = table.get(Constants.METADATA, Constants.ALIASENAME);
-    return (rows != null) ? true : false;
-  }
-
-  public void close() throws IOException {
-    if (closed) // have been closed
-      return;
-    int reference = decrementAndGetRef();
-    if (reference <= 0) { // no reference again.
-      if (!hasAliaseName()) { // the table has not been aliased, we delete the
-        // table.
-        if (admin.isTableEnabled(matrixPath)) {
-          admin.disableTable(matrixPath);
-          admin.deleteTable(matrixPath);
-        }
-      }
-    }
-    closed = true;
-  }
-
-  public boolean save(String aliasename) throws IOException {
-    // mark & update the aliase name in "alise:name" meta column.
-    // ! one matrix has only one aliasename now.
-    BatchUpdate update = new BatchUpdate(Constants.METADATA);
-    update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
-    table.commit(update);
-    return hamaAdmin.save(this, aliasename);
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Methods of the matrix classes
+ */
+public abstract class AbstractMatrix implements Matrix {
+  static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
+
+  protected HamaConfiguration config;
+  protected HBaseAdmin admin;
+  // a matrix just need a table path to point to the table which stores matrix.
+  // let HamaAdmin manage Matrix Name space.
+  protected String matrixPath;
+  protected HTable table;
+  protected HTableDescriptor tableDesc;
+  protected HamaAdmin hamaAdmin;
+
+  protected boolean closed = true;
+
+  /**
+   * Sets the job configuration
+   * 
+   * @param conf configuration object
+   * @throws MasterNotRunningException
+   */
+  public void setConfiguration(HamaConfiguration conf)
+      throws MasterNotRunningException {
+    this.config = conf;
+    this.admin = new HBaseAdmin(config);
+
+    hamaAdmin = new HamaAdminImpl(conf, admin);
+  }
+
+  /**
+   * Create matrix space
+   */
+  protected void create() throws IOException {
+    // It should run only when table doesn't exist.
+    if (!admin.tableExists(matrixPath)) {
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
+      this.tableDesc.addFamily(new HColumnDescriptor(
+          Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE, 
+          false, false, Integer.MAX_VALUE, HConstants.FOREVER, false
+      ));
+
+      LOG.info("Initializing the matrix storage.");
+      this.admin.createTable(this.tableDesc);
+      LOG.info("Create Matrix " + matrixPath);
+
+      // connect to the table.
+      table = new HTable(config, matrixPath);
+      // Record the matrix type in METADATA_TYPE
+      BatchUpdate update = new BatchUpdate(Constants.METADATA);
+      update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
+          .getSimpleName()));
+
+      table.commit(update);
+
+      // the new matrix's reference is 1.
+      setReference(1);
+    }
+  }
+
+  public HTable getHTable() {
+    return this.table;
+  }
+  
+  /** {@inheritDoc} */
+  public int getRows() throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
+    return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+  }
+
+  /** {@inheritDoc} */
+  public int getColumns() throws IOException {
+    Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
+    return BytesUtil.bytesToInt(columns.getValue());
+  }
+
+  /** {@inheritDoc} */
+  public void set(int i, int j, double value) throws IOException {
+    VectorUpdate update = new VectorUpdate(i);
+    update.put(j, value);
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public void add(int i, int j, double value) throws IOException {
+    VectorUpdate update = new VectorUpdate(i);
+    update.put(j, value + this.get(i, j));
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public void setDimension(int rows, int columns) throws IOException {
+    VectorUpdate update = new VectorUpdate(Constants.METADATA);
+    update.put(Constants.METADATA_ROWS, rows);
+    update.put(Constants.METADATA_COLUMNS, columns);
+
+    table.commit(update.getBatchUpdate());
+  }
+
+  public String getRowLabel(int row) throws IOException {
+    Cell rows = null;
+    rows = table.get(BytesUtil.getRowIndex(row), Bytes
+        .toBytes(Constants.ATTRIBUTE + "string"));
+
+    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+  }
+
+  public void setRowLabel(int row, String name) throws IOException {
+    VectorUpdate update = new VectorUpdate(row);
+    update.put(Constants.ATTRIBUTE + "string", name);
+    table.commit(update.getBatchUpdate());
+  }
+
+  public String getColumnLabel(int column) throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
+    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+  }
+
+  public void setColumnLabel(int column, String name) throws IOException {
+    VectorUpdate update = new VectorUpdate(Constants.CINDEX);
+    update.put(column, name);
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public String getPath() {
+    return matrixPath;
+  }
+
+  protected void setReference(int reference) throws IOException {
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
+    table.commit(update);
+  }
+
+  protected int incrementAndGetRef() throws IOException {
+    int reference = 1;
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+    if (rows != null) {
+      reference = Bytes.toInt(rows.getValue());
+      reference++;
+    }
+    setReference(reference);
+    return reference;
+  }
+
+  protected int decrementAndGetRef() throws IOException {
+    int reference = 0;
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+    if (rows != null) {
+      reference = Bytes.toInt(rows.getValue());
+      if (reference > 0) // reference==0, we need not to decrement it.
+        reference--;
+    }
+    setReference(reference);
+    return reference;
+  }
+
+  protected boolean hasAliaseName() throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.ALIASENAME);
+    return (rows != null) ? true : false;
+  }
+
+  public void close() throws IOException {
+    if (closed) // have been closed
+      return;
+    int reference = decrementAndGetRef();
+    if (reference <= 0) { // no reference again.
+      if (!hasAliaseName()) { // the table has not been aliased, we delete the
+        // table.
+        if (admin.isTableEnabled(matrixPath)) {
+          admin.disableTable(matrixPath);
+          admin.deleteTable(matrixPath);
+        }
+      }
+    }
+    closed = true;
+  }
+
+  public boolean save(String aliasename) throws IOException {
+    // mark & update the aliase name in "alise:name" meta column.
+    // ! one matrix has only one aliasename now.
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
+    table.commit(update);
+    return hamaAdmin.save(this, aliasename);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Dec 30 03:52:47 2008
@@ -87,15 +87,8 @@
   /** default try times to generate a suitable tablename */
   public static final int DEFAULT_TRY_TIMES = 10000000;
   
-  /**
-   * block position column to store 
-   * {@link org.apache.hama.io.BlockPosition} object
-   */
-  public static final String BLOCK_POSITION = "attribute:blockPosition";
-  
   /** block data column */
   public static final String BLOCK = "block:";
 
-  /** block size */
-  public static final String BLOCK_SIZE = "attribute:blockSize";
+  public static final String BLOCK_PATH = "attribute:blockPath";
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Tue Dec 30 03:52:47 2008
@@ -49,7 +49,6 @@
 import org.apache.hama.algebra.SIMDMultiplyMap;
 import org.apache.hama.algebra.SIMDMultiplyReduce;
 import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.io.MapWritable;
@@ -395,7 +394,8 @@
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
     if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
-      BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
+      BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), 
+          ((DenseMatrix) B).getBlockedMatrixPath(),
           BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
           jobConf);
       BlockCyclicMultiplyReduce.initJob(result.getPath(),
@@ -477,7 +477,7 @@
   }
 
   public boolean isBlocked() throws IOException {
-    return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
+    return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false
         : true;
   }
 
@@ -486,64 +486,12 @@
         Bytes.toBytes(Constants.BLOCK)).getValue());
   }
 
-  /**
-   * @return the size of block
-   * @throws IOException
-   */
-  public int getBlockSize() throws IOException {
-    return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
-        Constants.BLOCK_SIZE).getValue()) : -1;
-  }
-
-  protected void setBlockSize(int blockNum) throws IOException {
-    BatchUpdate update = new BatchUpdate(Constants.METADATA);
-    update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
-    table.commit(update);
-  }
-
   public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
     BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
     update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
     table.commit(update);
   }
-
-  protected void setBlockPosition(int blockNum) throws IOException {
-    int block_row_size = this.getRows() / blockNum;
-    int block_column_size = this.getColumns() / blockNum;
-
-    int startRow, endRow, startColumn, endColumn;
-    int i = 0, j = 0;
-    do {
-      startRow = i * block_row_size;
-      endRow = (startRow + block_row_size) - 1;
-      if (endRow >= this.getRows())
-        endRow = this.getRows() - 1;
-
-      j = 0;
-      do {
-        startColumn = j * block_column_size;
-        endColumn = (startColumn + block_column_size) - 1;
-        if (endColumn >= this.getColumns())
-          endColumn = this.getColumns() - 1;
-
-        BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
-        update.put(Constants.BLOCK_POSITION, new BlockPosition(startRow,
-            endRow, startColumn, endColumn).getBytes());
-        table.commit(update);
-
-        j++;
-      } while (endColumn < (this.getColumns() - 1));
-
-      i++;
-    } while (endRow < (this.getRows() - 1));
-  }
-
-  protected BlockPosition getBlockPosition(int i, int j) throws IOException {
-    byte[] rs = table.get(new BlockID(i, j).getBytes(),
-        Bytes.toBytes(Constants.BLOCK_POSITION)).getValue();
-    return new BlockPosition(rs);
-  }
-
+  
   /**
    * Using a map/reduce job to block a dense matrix.
    * 
@@ -551,51 +499,34 @@
    * @throws IOException
    */
   public void blocking_mapred(int blockNum) throws IOException {
-    this.checkBlockNum(blockNum);
+    double blocks = Math.pow(blockNum, 0.5);
+    if (!String.valueOf(blocks).endsWith(".0"))
+      throw new IOException("can't divide.");
 
+    int block_size = (int) blocks;
+    Matrix blockedMatrix = new DenseMatrix(config);
+    blockedMatrix.setDimension(block_size, block_size);
+    this.setBlockedMatrixPath(blockedMatrix.getPath());
+    
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("Blocking MR job" + getPath());
 
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    BlockingMapRed.initJob(getPath(), jobConf);
-
+    BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(), 
+        block_size, this.getRows(), this.getColumns(), jobConf);
     JobManager.execute(jobConf);
   }
 
-  /**
-   * Using a scanner to block a dense matrix. If the matrix is large, use the
-   * blocking_mapred()
-   * 
-   * @param blockNum
-   * @throws IOException
-   */
-  public void blocking(int blockNum) throws IOException {
-    this.checkBlockNum(blockNum);
-
-    String[] columns = new String[] { Constants.BLOCK_POSITION };
-    Scanner scan = table.getScanner(columns);
-
-    for (RowResult row : scan) {
-      BlockID bID = new BlockID(row.getRow());
-      BlockPosition pos = new BlockPosition(row.get(Constants.BLOCK_POSITION)
-          .getValue());
-
-      setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), pos
-          .getEndRow(), pos.getStartColumn(), pos.getEndColumn()));
-    }
+  public String getBlockedMatrixPath() throws IOException {
+    return Bytes.toString(table.get(Constants.METADATA,
+        Constants.BLOCK_PATH).getValue());
   }
 
-  private void checkBlockNum(int blockNum) throws IOException {
-    double blocks = Math.pow(blockNum, 0.5);
-    // TODO: Check also it is validation with matrix.
-    if (!String.valueOf(blocks).endsWith(".0"))
-      throw new IOException("can't divide.");
-
-    int block_size = (int) blocks;
-    setBlockPosition(block_size);
-    setBlockSize(block_size);
-    LOG.info("Create " + block_size + " * " + block_size + " blocked matrix");
+  protected void setBlockedMatrixPath(String path) throws IOException {
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
+    table.commit(update);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,199 +1,213 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-/**
- * A sub matrix is a matrix formed by selecting certain rows and columns from a
- * bigger matrix. This is a in-memory operation only.
- */
-public class SubMatrix implements java.io.Serializable {
-  private static final long serialVersionUID = 3897536498367921547L;
-  static final Logger LOG = Logger.getLogger(SubMatrix.class);
-  private double[][] matrix;
-
-  /**
-   * Constructor
-   * 
-   * @param i the size of rows
-   * @param j the size of columns
-   */
-  public SubMatrix(int i, int j) {
-    this.matrix = new double[i][j];
-  }
-
-  /**
-   * Constructor
-   * 
-   * @param c a two dimensional double array
-   */
-  public SubMatrix(double[][] c) {
-    double[][] matrix = c;
-    this.matrix = matrix;
-  }
-
-  public SubMatrix(byte[] matrix) throws IOException {
-    ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
-    ObjectInputStream oos = new ObjectInputStream(bos);
-    Object obj = null;
-    try {
-      obj = oos.readObject();
-      this.matrix = ((SubMatrix)obj).getDoubleArray();
-    } catch (ClassNotFoundException e) {
-      e.printStackTrace();
-    }
-    oos.close();
-    bos.close();
-  }
-  
-  /**
-   * Sets the value
-   * 
-   * @param row
-   * @param column
-   * @param value
-   */
-  public void set(int row, int column, double value) {
-    matrix[row][column] = value;
-  }
-
-  /**
-   * Sets the value
-   * 
-   * @param row
-   * @param column
-   * @param value
-   */
-  public void set(int row, int column, byte[] value) {
-    matrix[row][column] = BytesUtil.bytesToDouble(value);    
-  }
-  
-  /**
-   * Gets the value
-   * 
-   * @param i
-   * @param j
-   * @return the value of submatrix(i, j)
-   */
-  public double get(int i, int j) {
-    return matrix[i][j];
-  }
-
-  public void add(int row, int column, double value) {
-    matrix[row][column] = matrix[row][column] + value;
-  }
-
-  /**
-   * c = a+b
-   * 
-   * @param b
-   * @return c
-   */
-  public SubMatrix add(SubMatrix b) {
-    SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
-    
-    for (int i = 0; i < this.getRows(); i++) {
-      for (int j = 0; j < this.getColumns(); j++) {
-        c.set(i, j, (this.get(i, j) + b.get(i, j)));
-      }
-    }
-
-    return c;
-  }
-
-  /**
-   * c = a*b
-   * 
-   * @param b
-   * @return c
-   */
-  public SubMatrix mult(SubMatrix b) {
-    SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
-    
-    for (int i = 0; i < this.getRows(); i++) {
-      for (int j = 0; j < b.getColumns(); j++) {
-        for (int k = 0; k < this.getColumns(); k++) {
-          c.add(i, j, this.get(i, k) * b.get(k, j));
-        }
-      }
-    }
-
-    return c;
-  }
-
-  /**
-   * Gets the number of rows
-   * 
-   * @return the number of rows
-   */
-  public int getRows() {
-    return this.matrix.length;
-  }
-
-  /**
-   * Gets the number of columns
-   * 
-   * @return the number of columns
-   */
-  public int getColumns() {
-    return this.matrix[0].length;
-  }
-
-  /**
-   * Close
-   */
-  public void close() {
-    matrix = null;
-  }
-
-  /**
-   * @return the 2d double array
-   */
-  public double[][] getDoubleArray() {
-    double[][] result = matrix;
-    return result;
-  }
-
-  /**
-   * Gets the bytes of the sub matrix
-   * 
-   * @return the bytes of the sub matrix
-   * @throws IOException
-   */
-  public byte[] getBytes() throws IOException {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    ObjectOutputStream oos = new ObjectOutputStream(bos);
-    oos.writeObject(this);
-    oos.flush();
-    oos.close();
-    bos.close();
-    byte[] data = bos.toByteArray();
-    return data;
-  }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * A sub matrix is a matrix formed by selecting certain rows and columns from a
+ * bigger matrix. This is a in-memory operation only.
+ */
+public class SubMatrix implements java.io.Serializable {
+  private static final long serialVersionUID = 3897536498367921547L;
+  static final Logger LOG = Logger.getLogger(SubMatrix.class);
+  private double[][] matrix;
+
+  /**
+   * Constructor
+   * 
+   * @param i the size of rows
+   * @param j the size of columns
+   */
+  public SubMatrix(int i, int j) {
+    this.matrix = new double[i][j];
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param c a two dimensional double array
+   */
+  public SubMatrix(double[][] c) {
+    double[][] matrix = c;
+    this.matrix = matrix;
+  }
+
+  public SubMatrix(byte[] matrix) throws IOException {
+    ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
+    DataInputStream dis = new DataInputStream(bos);
+    
+    int rows = dis.readInt();
+    int columns = dis.readInt();
+    this.matrix = new double[rows][columns];
+    
+    for(int i = 0; i < rows; i++) {
+      for(int j = 0; j < columns; j++) {
+        this.matrix[i][j] = dis.readDouble();        
+      }
+    }
+    
+    dis.close();
+    bos.close();
+  }
+  
+  /**
+   * Sets the value
+   * 
+   * @param row
+   * @param column
+   * @param value
+   */
+  public void set(int row, int column, double value) {
+    matrix[row][column] = value;
+  }
+
+  /**
+   * Sets the value
+   * 
+   * @param row
+   * @param column
+   * @param value
+   */
+  public void set(int row, int column, byte[] value) {
+    matrix[row][column] = BytesUtil.bytesToDouble(value);    
+  }
+  
+  /**
+   * Gets the value
+   * 
+   * @param i
+   * @param j
+   * @return the value of submatrix(i, j)
+   */
+  public double get(int i, int j) {
+    return matrix[i][j];
+  }
+
+  public void add(int row, int column, double value) {
+    matrix[row][column] = matrix[row][column] + value;
+  }
+
+  /**
+   * c = a+b
+   * 
+   * @param b
+   * @return c
+   */
+  public SubMatrix add(SubMatrix b) {
+    SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
+    
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < this.getColumns(); j++) {
+        c.set(i, j, (this.get(i, j) + b.get(i, j)));
+      }
+    }
+
+    return c;
+  }
+
+  /**
+   * c = a*b
+   * 
+   * @param b
+   * @return c
+   */
+  public SubMatrix mult(SubMatrix b) {
+    SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
+    
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < b.getColumns(); j++) {
+        for (int k = 0; k < this.getColumns(); k++) {
+          c.add(i, j, this.get(i, k) * b.get(k, j));
+        }
+      }
+    }
+
+    return c;
+  }
+
+  /**
+   * Gets the number of rows
+   * 
+   * @return the number of rows
+   */
+  public int getRows() {
+    return this.matrix.length;
+  }
+
+  /**
+   * Gets the number of columns
+   * 
+   * @return the number of columns
+   */
+  public int getColumns() {
+    return this.matrix[0].length;
+  }
+
+  /**
+   * Close
+   */
+  public void close() {
+    matrix = null;
+  }
+
+  /**
+   * @return the 2d double array
+   */
+  public double[][] getDoubleArray() {
+    double[][] result = matrix;
+    return result;
+  }
+
+  /**
+   * Gets the bytes of the sub matrix
+   * 
+   * @return the bytes of the sub matrix
+   * @throws IOException
+   */
+  public byte[] getBytes() throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    
+    dos.writeInt(this.getRows());
+    dos.writeInt(this.getColumns());
+    
+    for(int i = 0; i < this.getRows(); i++) {
+      for(int j = 0; j < this.getColumns(); j++) {
+        dos.writeDouble(this.get(i, j));
+      }
+    }
+
+    byte[] data = bos.toByteArray();
+    dos.close();
+    bos.close();
+    return data;
+  }
+
+}
+

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Tue Dec 30 03:52:47 2008
@@ -21,6 +21,10 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -69,13 +73,24 @@
   public void map(BlockID key, BlockWritable value,
       OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
       throws IOException {
-    int blockSize = matrix_b.getBlockSize();
+    int blockSize = matrix_b.getRows();
     SubMatrix a = value.get();
-    
-    for (int j = 0; j < blockSize; j++) {
-      SubMatrix b = matrix_b.getBlock(key.getColumn(), j);
+    HTable table = matrix_b.getHTable();
+
+    // startKey : new BlockID(key.getColumn(), 0).toString()
+    // endKey : new BlockID(key.getColumn(), blockSize+1).toString()
+    Scanner scan = table.getScanner(new byte[][] { Bytes
+        .toBytes(Constants.BLOCK) },
+        new BlockID(key.getColumn(), 0).getBytes(), new BlockID(
+            key.getColumn(), blockSize + 1).getBytes());
+
+    for (RowResult row : scan) {
+      BlockID bid = new BlockID(row.getRow());
+      SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue());
       SubMatrix c = a.mult(b);
-      output.collect(new BlockID(key.getRow(), j), new BlockWritable(c));
+      output.collect(new BlockID(key.getRow(), bid.getColumn()),
+          new BlockWritable(c));
     }
+    scan.close();
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Tue Dec 30 03:52:47 2008
@@ -19,20 +19,19 @@
  */
 package org.apache.hama.io;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
 
 /** A WritableComparable for BlockIDs. */
 @SuppressWarnings("unchecked")
-public class BlockID implements WritableComparable, java.io.Serializable {
-  private static final long serialVersionUID = 6434651179475226613L;
+public class BlockID implements WritableComparable {
+  static final Logger LOG = Logger.getLogger(BlockID.class);
+  public static final int PAD_SIZE = 15;
   private int row;
   private int column;
 
@@ -44,18 +43,28 @@
   }
 
   public BlockID(byte[] bytes) throws IOException {
-    ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
-    ObjectInputStream oos = new ObjectInputStream(bos);
-    Object obj = null;
+    String rKey = Bytes.toString(bytes);
+    String keys[] = null;
+    if (rKey.substring(0, 8).equals("00000000")) {
+      int i = 8;
+      while (rKey.charAt(i) == '0') {
+        i++;
+      }
+      keys = rKey.substring(i, rKey.length()).split("[,]");
+    } else {
+      int i = 0;
+      while (rKey.charAt(i) == '0') {
+        i++;
+      }
+      keys = rKey.substring(i, rKey.length()).split("[,]");
+    }
+
     try {
-      obj = oos.readObject();
-      this.row = ((BlockID)obj).getRow();
-      this.column = ((BlockID)obj).getColumn();
-    } catch (ClassNotFoundException e) {
-      e.printStackTrace();
+      this.row = Integer.parseInt(keys[1]);
+      this.column = Integer.parseInt(keys[2]);
+    } catch (ArrayIndexOutOfBoundsException e) {
+      throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e);
     }
-    oos.close();
-    bos.close();
   }
 
   public void set(int row, int column) {
@@ -72,20 +81,27 @@
   }
 
   public void readFields(DataInput in) throws IOException {
-    column = in.readInt();
-    row = in.readInt();
+    BlockID value = new BlockID(Bytes.readByteArray(in));
+    this.row = value.getRow();
+    this.column = value.getColumn();
   }
 
   public void write(DataOutput out) throws IOException {
-    out.writeInt(column);
-    out.writeInt(row);
+    Bytes.writeByteArray(out, Bytes.toBytes(this.toString()));
   }
 
   /**
    * Make BlockID's string representation be same format.
    */
   public String toString() {
-    return row + "," + column;
+    int zeros = PAD_SIZE - String.valueOf(row).length()
+        - String.valueOf(column).length();
+    StringBuffer buf = new StringBuffer();
+    for (int i = 0; i < zeros; ++i) {
+      buf.append("0");
+    }
+
+    return buf.toString() + "," + row + "," + column;
   }
 
   @Override
@@ -110,19 +126,14 @@
 
   @Override
   public boolean equals(Object o) {
-    if(o == null) return false;
-    if(!(o instanceof BlockID)) return false;
+    if (o == null)
+      return false;
+    if (!(o instanceof BlockID))
+      return false;
     return compareTo(o) == 0;
   }
 
-  public byte[] getBytes() throws IOException {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    ObjectOutputStream oos = new ObjectOutputStream(bos);
-    oos.writeObject(this);
-    oos.flush();
-    oos.close();
-    bos.close();
-    byte[] data = bos.toByteArray();
-    return data;
+  public byte[] getBytes() {
+    return Bytes.toBytes(this.toString());
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Tue Dec 30 03:52:47 2008
@@ -1,56 +1,77 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.SubMatrix;
-
-public class BlockWritable implements Writable {
-  public SubMatrix matrix;
-
-  public BlockWritable() {
-    this.matrix = new SubMatrix(0, 0);
-  }
-
-  public BlockWritable(SubMatrix c) {
-    this.matrix = c;
-  }
-
-  public BlockWritable(byte[] bytes) throws IOException {
-    this.matrix = new SubMatrix(bytes);
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.matrix = new SubMatrix(Bytes.readByteArray(in));
-  }
-
-  public void write(DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, this.matrix.getBytes());
-  }
-
-  public SubMatrix get() {
-    return this.matrix;
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.SubMatrix;
+
+public class BlockWritable implements Writable {
+  private SubMatrix matrix;
+
+  public BlockWritable() {
+    this.matrix = new SubMatrix(0, 0);
+  }
+
+  public BlockWritable(SubMatrix c) {
+    this.matrix = c;
+  }
+
+  public BlockWritable(byte[] bytes) throws IOException {
+    this.matrix = new SubMatrix(bytes);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    
+    int rows = in.readInt();
+    int columns = in.readInt();
+    this.matrix = new SubMatrix(rows, columns);
+    
+    for(int i = 0; i < rows; i++) {
+      for(int j = 0; j < columns; j++) {
+        this.matrix.set(i, j, in.readDouble());
+      }
+    }
+    
+    //this.matrix = new SubMatrix(Bytes.readByteArray(in));
+  }
+
+  public void write(DataOutput out) throws IOException {
+    //Bytes.writeByteArray(out, this.matrix.getBytes());
+    
+    out.writeInt(this.matrix.getRows());
+    out.writeInt(this.matrix.getColumns());
+    
+    for(int i = 0; i < this.matrix.getRows(); i++) {
+      for(int j = 0; j < this.matrix.getColumns(); j++) {
+        out.writeDouble(this.matrix.get(i, j));
+      }
+    }
+  }
+
+  public SubMatrix get() {
+    return this.matrix;
+  }
+}
+

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Tue Dec 30 03:52:47 2008
@@ -1,180 +1,197 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.DenseVector;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.VectorWritable;
-
-/**
- * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
- */
-public class BlockingMapRed {
-
-  static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
-  /** Parameter of the path of the matrix to be blocked * */
-  public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
-
-  /**
-   * Initialize a job to blocking a table
-   * 
-   * @param matrixPath
-   * @param job
-   */
-  public static void initJob(String matrixPath, JobConf job) {
-    job.setMapperClass(BlockingMapper.class);
-    job.setReducerClass(BlockingReducer.class);
-    FileInputFormat.addInputPaths(job, matrixPath);
-
-    job.setInputFormat(VectorInputFormat.class);
-    job.setMapOutputKeyClass(BlockID.class);
-    job.setMapOutputValueClass(VectorWritable.class);
-    job.setOutputFormat(NullOutputFormat.class);
-
-    job.set(BLOCKING_MATRIX, matrixPath);
-    job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
-  }
-
-  /**
-   * Abstract Blocking Map/Reduce Class to configure the job.
-   */
-  public static abstract class BlockingMapRedBase extends MapReduceBase {
-
-    protected DenseMatrix matrix;
-    protected int mBlockNum;
-    protected int mBlockRowSize;
-    protected int mBlockColSize;
-    
-    protected int mRows;
-    protected int mColumns;
-
-    @Override
-    public void configure(JobConf job) {
-      try {
-        matrix = new DenseMatrix(new HamaConfiguration(), job.get(
-            BLOCKING_MATRIX, ""));
-        mBlockNum = matrix.getBlockSize();
-        mBlockRowSize = matrix.getRows() / mBlockNum;
-        mBlockColSize = matrix.getColumns() / mBlockNum;
-        
-        mRows = matrix.getRows();
-        mColumns = matrix.getColumns();
-      } catch (IOException e) {
-        LOG.warn("Load matrix_blocking failed : " + e.getMessage());
-      }
-    }
-
-  }
-
-  /**
-   * Mapper Class
-   */
-  public static class BlockingMapper extends BlockingMapRedBase implements
-      Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
-
-    @Override
-    public void map(IntWritable key, VectorWritable value,
-        OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
-        throws IOException {
-      int startColumn;
-      int endColumn;
-      int blkRow = key.get() / mBlockRowSize;
-      DenseVector dv = value.getDenseVector();
-      
-      int i = 0;
-      do {
-        startColumn = i * mBlockColSize;
-        endColumn = startColumn + mBlockColSize - 1;
-        if(endColumn >= mColumns) // the last sub vector
-          endColumn = mColumns - 1;
-        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
-            dv.subVector(startColumn, endColumn)));
-        
-        i++;
-      } while(endColumn < (mColumns-1));
-    }
-
-  }
-
-  /**
-   * Reducer Class
-   */
-  public static class BlockingReducer extends BlockingMapRedBase implements
-      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
-
-    @Override
-    public void reduce(BlockID key, Iterator<VectorWritable> values,
-        OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
-        throws IOException {
-      // Note: all the sub-vectors are grouped by {@link
-      // org.apache.hama.io.BlockID}
-      
-      // the block's base offset in the original matrix
-      int colBase = key.getColumn() * mBlockColSize;
-      int rowBase = key.getRow() * mBlockRowSize;
-      
-      // the block's size : rows & columns
-      int smRows = mBlockRowSize;
-      if((rowBase + mBlockRowSize - 1) >= mRows)
-        smRows = mRows - rowBase;
-      int smCols = mBlockColSize;
-      if((colBase + mBlockColSize - 1) >= mColumns)  
-        smCols = mColumns - colBase;
-      
-      // construct the matrix
-      SubMatrix subMatrix = new SubMatrix(smRows, smCols);
-      
-      // i, j is the current offset in the sub-matrix
-      int i = 0, j = 0;
-      while (values.hasNext()) {
-        VectorWritable vw = values.next();
-        // check the size is suitable
-        if (vw.size() != smCols)
-          throw new IOException("Block Column Size dismatched.");
-        i = vw.row - rowBase;
-        if (i >= smRows || i < 0)
-          throw new IOException("Block Row Size dismatched.");
-
-        // put the subVector to the subMatrix
-        for (j = 0; j < smCols; j++) {
-          subMatrix.set(i, j, vw.get(colBase + j));
-        }
-      }
-
-      matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
-    }
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.VectorWritable;
+
+/**
+ * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
+ */
+public class BlockingMapRed {
+
+  static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
+  /** Parameter of the path of the matrix to be blocked * */
+  public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
+  public static final String BLOCKED_MATRIX = "hama.blocked.matrix";
+  public static final String BLOCK_SIZE = "hama.blocking.size";
+  public static final String ROWS = "hama.blocking.rows";
+  public static final String COLUMNS = "hama.blocking.columns";
+
+  /**
+   * Initialize a job to blocking a table
+   * 
+   * @param matrixPath
+   * @param string 
+   * @param j 
+   * @param i 
+   * @param block_size 
+   * @param job
+   */
+  public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) {
+    job.setMapperClass(BlockingMapper.class);
+    job.setReducerClass(BlockingReducer.class);
+    FileInputFormat.addInputPaths(job, matrixPath);
+
+    job.setInputFormat(VectorInputFormat.class);
+    job.setMapOutputKeyClass(BlockID.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setOutputFormat(NullOutputFormat.class);
+
+    job.set(BLOCKING_MATRIX, matrixPath);
+    job.set(BLOCKED_MATRIX, string);
+    job.set(BLOCK_SIZE, String.valueOf(block_size));
+    job.set(ROWS, String.valueOf(i));
+    job.set(COLUMNS, String.valueOf(j));
+    
+    job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  /**
+   * Abstract Blocking Map/Reduce Class to configure the job.
+   */
+  public static abstract class BlockingMapRedBase extends MapReduceBase {
+
+    protected DenseMatrix matrix;
+    protected DenseMatrix blockedMatrix;
+    protected int mBlockNum;
+    protected int mBlockRowSize;
+    protected int mBlockColSize;
+    
+    protected int mRows;
+    protected int mColumns;
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        matrix = new DenseMatrix(new HamaConfiguration(), job.get(
+            BLOCKING_MATRIX, ""));
+        blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get(
+            BLOCKED_MATRIX, ""));
+        
+        mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
+        mRows = Integer.parseInt(job.get(ROWS, ""));
+        mColumns = Integer.parseInt(job.get(COLUMNS, ""));
+        
+        mBlockRowSize = mRows / mBlockNum;
+        mBlockColSize = mColumns / mBlockNum;
+      } catch (IOException e) {
+        LOG.warn("Load matrix_blocking failed : " + e.getMessage());
+      }
+    }
+
+  }
+
+  /**
+   * Mapper Class
+   */
+  public static class BlockingMapper extends BlockingMapRedBase implements
+      Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
+
+    @Override
+    public void map(IntWritable key, VectorWritable value,
+        OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+        throws IOException {
+      int startColumn;
+      int endColumn;
+      int blkRow = key.get() / mBlockRowSize;
+      DenseVector dv = value.getDenseVector();
+      
+      int i = 0;
+      do {
+        startColumn = i * mBlockColSize;
+        endColumn = startColumn + mBlockColSize - 1;
+        if(endColumn >= mColumns) // the last sub vector
+          endColumn = mColumns - 1;
+        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
+            dv.subVector(startColumn, endColumn)));
+        
+        i++;
+      } while(endColumn < (mColumns-1));
+    }
+
+  }
+
+  /**
+   * Reducer Class
+   */
+  public static class BlockingReducer extends BlockingMapRedBase implements
+      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
+
+    @Override
+    public void reduce(BlockID key, Iterator<VectorWritable> values,
+        OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+        throws IOException {
+      // Note: all the sub-vectors are grouped by {@link
+      // org.apache.hama.io.BlockID}
+      
+      // the block's base offset in the original matrix
+      int colBase = key.getColumn() * mBlockColSize;
+      int rowBase = key.getRow() * mBlockRowSize;
+      
+      // the block's size : rows & columns
+      int smRows = mBlockRowSize;
+      if((rowBase + mBlockRowSize - 1) >= mRows)
+        smRows = mRows - rowBase;
+      int smCols = mBlockColSize;
+      if((colBase + mBlockColSize - 1) >= mColumns)  
+        smCols = mColumns - colBase;
+      
+      // construct the matrix
+      SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+      
+      // i, j is the current offset in the sub-matrix
+      int i = 0, j = 0;
+      while (values.hasNext()) {
+        VectorWritable vw = values.next();
+        // check the size is suitable
+        if (vw.size() != smCols)
+          throw new IOException("Block Column Size dismatched.");
+        i = vw.row - rowBase;
+        if (i >= smRows || i < 0)
+          throw new IOException("Block Row Size dismatched.");
+
+        // put the subVector to the subMatrix
+        for (j = 0; j < smCols; j++) {
+          subMatrix.set(i, j, vw.get(colBase + j));
+        }
+      }
+
+      blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Tue Dec 30 03:52:47 2008
@@ -1,357 +1,324 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hama.io.BlockPosition;
-import org.apache.hama.io.DoubleEntry;
-import org.apache.log4j.Logger;
-
-/**
- * Matrix test
- */
-public class TestDenseMatrix extends TestCase {
-  static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
-  private static int SIZE = 10;
-  private static Matrix m1;
-  private static Matrix m2;
-  private final static String aliase1 = "matrix_aliase_A";
-  private final static String aliase2 = "matrix_aliase_B";
-  private static HamaConfiguration conf;
-  private static HBaseAdmin admin;
-  private static HamaAdmin hamaAdmin;
-
-  public static Test suite() {
-    TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
-      protected void setUp() throws Exception {
-        HCluster hCluster = new HCluster();
-        hCluster.setUp();
-
-        conf = hCluster.getConf();
-        admin = new HBaseAdmin(conf);
-        hamaAdmin = new HamaAdminImpl(conf, admin);
-
-        m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
-        m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
-      }
-
-      protected void tearDown() {
-        try {
-          closeTest();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    return setup;
-  }
-
-  public static void closeTest() throws IOException {
-    m1.close();
-    m2.close();
-  }
-
-  public void testEntryAdd() throws IOException {
-    double origin = m1.get(1, 1);
-    m1.add(1, 1, 0.5);
-
-    assertEquals(m1.get(1, 1), origin + 0.5);
-  }
-
-  public void testBlocking() throws IOException, ClassNotFoundException {
-    assertEquals(((DenseMatrix) m1).isBlocked(), false);
-    ((DenseMatrix) m1).blocking(4);
-    assertEquals(((DenseMatrix) m1).isBlocked(), true);
-    BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0);
-    double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(),
-        pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
-        .getDoubleArray();
-    double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray();
-    assertEquals(((DenseMatrix) m1).getBlockSize(), 2);
-    assertEquals(c.length, 5);
-
-    for (int i = 0; i < b.length; i++) {
-      for (int j = 0; j < b.length; j++) {
-        assertEquals(b[i][j], c[i][j]);
-      }
-    }
-  }
-
-  /**
-   * Map/Reduce Blocking Test
-   * 
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  public void testMRBlocking() throws IOException, ClassNotFoundException {
-    assertEquals(((DenseMatrix) m2).isBlocked(), false);
-    ((DenseMatrix) m2).blocking_mapred(4);
-    assertEquals(((DenseMatrix) m2).isBlocked(), true);
-    BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0);
-    double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(),
-        pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn())
-        .getDoubleArray();
-    double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray();
-    assertEquals(((DenseMatrix) m2).getBlockSize(), 2);
-    assertEquals(c.length, 5);
-
-    for (int i = 0; i < b.length; i++) {
-      for (int j = 0; j < b.length; j++) {
-        assertEquals(b[i][j], c[i][j]);
-      }
-    }
-  }
-
-  /**
-   * Column vector test.
-   * 
-   * @param rand
-   * @throws IOException
-   */
-  public void testGetColumn() throws IOException {
-    Vector v = m1.getColumn(0);
-    Iterator<DoubleEntry> it = v.iterator();
-    int x = 0;
-    while (it.hasNext()) {
-      assertEquals(m1.get(x, 0), it.next().getValue());
-      x++;
-    }
-  }
-
-  public void testGetSetAttribute() throws IOException {
-    m1.setRowLabel(0, "row1");
-    assertEquals(m1.getRowLabel(0), "row1");
-    assertEquals(m1.getRowLabel(1), null);
-
-    m1.setColumnLabel(0, "column1");
-    assertEquals(m1.getColumnLabel(0), "column1");
-    assertEquals(m1.getColumnLabel(1), null);
-  }
-
-  public void testSubMatrix() throws IOException {
-    SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
-    for (int i = 0; i < a.getRows(); i++) {
-      for (int j = 0; j < a.getColumns(); j++) {
-        assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
-      }
-    }
-
-    SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
-    SubMatrix c = a.mult(b);
-
-    double[][] C = new double[3][3]; // A * B
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 3; j++) {
-        for (int k = 0; k < 4; k++) {
-          C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
-        }
-      }
-    }
-
-    for (int i = 0; i < 3; i++) {
-      for (int j = 0; j < 3; j++) {
-        assertEquals(C[i][j], c.get(i, j));
-      }
-    }
-  }
-
-  /**
-   * Test matrices addition
-   * 
-   * @throws IOException
-   */
-  public void testMatrixAdd() throws IOException {
-    Matrix result = m1.add(m2);
-
-    assertEquals(result.getRows(), SIZE);
-    assertEquals(result.getColumns(), SIZE);
-
-    for (int i = 0; i < SIZE; i++) {
-      for (int j = 0; j < SIZE; j++) {
-        assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
-      }
-    }
-  }
-
-  /**
-   * Test matrices multiplication
-   * 
-   * @throws IOException
-   */
-  public void testMatrixMult() throws IOException {
-    Matrix result = m1.mult(m2);
-
-    assertEquals(result.getRows(), SIZE);
-    assertEquals(result.getColumns(), SIZE);
-
-    verifyMultResult(m1, m2, result);
-  }
-
-  public void testSetRow() throws IOException {
-    Vector v = new DenseVector();
-    double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
-    for (int i = 0; i < SIZE; i++) {
-      v.set(i, entries[i]);
-    }
-
-    m1.setRow(SIZE + 1, v);
-    Iterator<DoubleEntry> it = m1.getRow(SIZE + 1).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      assertEquals(entries[i], it.next().getValue());
-      i++;
-    }
-  }
-
-  public void testSetColumn() throws IOException {
-    Vector v = new DenseVector();
-    double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
-    for (int i = 0; i < SIZE; i++) {
-      v.set(i, entries[i]);
-    }
-
-    m1.setColumn(SIZE + 1, v);
-    Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      assertEquals(entries[i], it.next().getValue());
-      i++;
-    }
-  }
-
-  public void testLoadSave() throws IOException {
-    String path1 = m1.getPath();
-    // save m1 to aliase1
-    m1.save(aliase1);
-    // load matrix m1 using aliase1
-    DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
-
-    for (int i = 0; i < loadTest.getRows(); i++) {
-      for (int j = 0; j < loadTest.getColumns(); j++) {
-        assertEquals(m1.get(i, j), loadTest.get(i, j));
-      }
-    }
-
-    assertEquals(path1, loadTest.getPath());
-    // close loadTest, it just disconnect to the table but didn't delete it.
-    loadTest.close();
-
-    // try to close m1 & load matrix m1 using aliase1 again.
-    m1.close();
-    DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
-    assertEquals(path1, loadTest2.getPath());
-    // remove aliase1
-    // because loadTest2 connect the aliase1, so we just remove aliase entry
-    // but didn't delete the table.
-    hamaAdmin.delete(aliase1);
-    assertEquals(true, admin.tableExists(path1));
-    // close loadTest2, because it is the last one who reference table 'path1'
-    // it will do the gc!
-    loadTest2.close();
-    assertEquals(false, admin.tableExists(path1));
-
-    // if we try to load non-existed matrix using aliase name, it should fail.
-    DenseMatrix loadTest3 = null;
-    try {
-      loadTest3 = new DenseMatrix(conf, aliase1, false);
-      fail("Try to load a non-existed matrix should fail!");
-    } catch (IOException e) {
-
-    } finally {
-      if (loadTest3 != null)
-        loadTest3.close();
-    }
-  }
-
-  public void testForceCreate() throws IOException {
-    String path2 = m2.getPath();
-    // save m2 to aliase2
-    m2.save(aliase2);
-    // load matrix m2 using aliase2
-    DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
-
-    for (int i = 0; i < loadTest.getRows(); i++) {
-      for (int j = 0; j < loadTest.getColumns(); j++) {
-        assertEquals(m2.get(i, j), loadTest.get(i, j));
-      }
-    }
-
-    assertEquals(path2, loadTest.getPath());
-
-    // force to create matrix loadTest2 using aliasename 'aliase2'
-    DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
-    String loadPath2 = loadTest2.getPath();
-    assertFalse(path2.equals(loadPath2));
-    assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
-    assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
-
-    // try to close m2 & loadTest, it table will be deleted finally
-    m2.close();
-    assertEquals(true, admin.tableExists(path2));
-    loadTest.close();
-    assertEquals(false, admin.tableExists(path2));
-
-    // remove 'aliase2' & close loadTest2
-    loadTest2.close();
-    assertEquals(true, admin.tableExists(loadPath2));
-    hamaAdmin.delete(aliase2);
-    assertEquals(false, admin.tableExists(loadPath2));
-  }
-
-  /**
-   * Verifying multiplication result
-   * 
-   * @param m1
-   * @param m2
-   * @param result
-   * @throws IOException
-   */
-  private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
-      throws IOException {
-    double[][] C = new double[SIZE][SIZE];
-
-    for (int i = 0; i < SIZE; i++) {
-      for (int j = 0; j < SIZE; j++) {
-        for (int k = 0; k < SIZE; k++) {
-          C[i][k] += m1.get(i, j) * m2.get(j, k);
-        }
-      }
-    }
-
-    for (int i = 0; i < SIZE; i++) {
-      for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
-            .valueOf(C[i][j]).substring(0, 14));
-      }
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.log4j.Logger;
+
+/**
+ * Matrix test
+ */
+public class TestDenseMatrix extends TestCase {
+  static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
+  private static int SIZE = 10;
+  private static Matrix m1;
+  private static Matrix m2;
+  private final static String aliase1 = "matrix_aliase_A";
+  private final static String aliase2 = "matrix_aliase_B";
+  private static HamaConfiguration conf;
+  private static HBaseAdmin admin;
+  private static HamaAdmin hamaAdmin;
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
+      protected void setUp() throws Exception {
+        HCluster hCluster = new HCluster();
+        hCluster.setUp();
+
+        conf = hCluster.getConf();
+        admin = new HBaseAdmin(conf);
+        hamaAdmin = new HamaAdminImpl(conf, admin);
+
+        m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+        m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+      }
+
+      protected void tearDown() {
+        try {
+          closeTest();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    return setup;
+  }
+
+  public static void closeTest() throws IOException {
+    m1.close();
+    m2.close();
+  }
+
+  public void testEntryAdd() throws IOException {
+    double origin = m1.get(1, 1);
+    m1.add(1, 1, 0.5);
+
+    assertEquals(m1.get(1, 1), origin + 0.5);
+  }
+
+  /**
+   * Map/Reduce Blocking Test
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public void testMRBlocking() throws IOException, ClassNotFoundException {
+    assertEquals(((DenseMatrix) m2).isBlocked(), false);
+    ((DenseMatrix) m2).blocking_mapred(4);
+    assertEquals(((DenseMatrix) m2).isBlocked(), true);
+  }
+
+  /**
+   * Column vector test.
+   * 
+   * @param rand
+   * @throws IOException
+   */
+  public void testGetColumn() throws IOException {
+    Vector v = m1.getColumn(0);
+    Iterator<DoubleEntry> it = v.iterator();
+    int x = 0;
+    while (it.hasNext()) {
+      assertEquals(m1.get(x, 0), it.next().getValue());
+      x++;
+    }
+  }
+
+  public void testGetSetAttribute() throws IOException {
+    m1.setRowLabel(0, "row1");
+    assertEquals(m1.getRowLabel(0), "row1");
+    assertEquals(m1.getRowLabel(1), null);
+
+    m1.setColumnLabel(0, "column1");
+    assertEquals(m1.getColumnLabel(0), "column1");
+    assertEquals(m1.getColumnLabel(1), null);
+  }
+
+  public void testSubMatrix() throws IOException {
+    SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
+    for (int i = 0; i < a.getRows(); i++) {
+      for (int j = 0; j < a.getColumns(); j++) {
+        assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
+      }
+    }
+
+    SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
+    SubMatrix c = a.mult(b);
+
+    double[][] C = new double[3][3]; // A * B
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        for (int k = 0; k < 4; k++) {
+          C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
+        }
+      }
+    }
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        assertEquals(C[i][j], c.get(i, j));
+      }
+    }
+  }
+
+  /**
+   * Test matrices addition
+   * 
+   * @throws IOException
+   */
+  public void testMatrixAdd() throws IOException {
+    Matrix result = m1.add(m2);
+
+    assertEquals(result.getRows(), SIZE);
+    assertEquals(result.getColumns(), SIZE);
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
+      }
+    }
+  }
+
+  /**
+   * Test matrices multiplication
+   * 
+   * @throws IOException
+   */
+  public void testMatrixMult() throws IOException {
+    Matrix result = m1.mult(m2);
+
+    assertEquals(result.getRows(), SIZE);
+    assertEquals(result.getColumns(), SIZE);
+
+    verifyMultResult(m1, m2, result);
+  }
+
+  public void testSetRow() throws IOException {
+    Vector v = new DenseVector();
+    double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+    for (int i = 0; i < SIZE; i++) {
+      v.set(i, entries[i]);
+    }
+
+    m1.setRow(SIZE + 1, v);
+    Iterator<DoubleEntry> it = m1.getRow(SIZE + 1).iterator();
+
+    int i = 0;
+    while (it.hasNext()) {
+      assertEquals(entries[i], it.next().getValue());
+      i++;
+    }
+  }
+
+  public void testSetColumn() throws IOException {
+    Vector v = new DenseVector();
+    double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+    for (int i = 0; i < SIZE; i++) {
+      v.set(i, entries[i]);
+    }
+
+    m1.setColumn(SIZE + 1, v);
+    Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
+
+    int i = 0;
+    while (it.hasNext()) {
+      assertEquals(entries[i], it.next().getValue());
+      i++;
+    }
+  }
+
+  public void testLoadSave() throws IOException {
+    String path1 = m1.getPath();
+    // save m1 to aliase1
+    m1.save(aliase1);
+    // load matrix m1 using aliase1
+    DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
+
+    for (int i = 0; i < loadTest.getRows(); i++) {
+      for (int j = 0; j < loadTest.getColumns(); j++) {
+        assertEquals(m1.get(i, j), loadTest.get(i, j));
+      }
+    }
+
+    assertEquals(path1, loadTest.getPath());
+    // close loadTest, it just disconnect to the table but didn't delete it.
+    loadTest.close();
+
+    // try to close m1 & load matrix m1 using aliase1 again.
+    m1.close();
+    DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
+    assertEquals(path1, loadTest2.getPath());
+    // remove aliase1
+    // because loadTest2 connect the aliase1, so we just remove aliase entry
+    // but didn't delete the table.
+    hamaAdmin.delete(aliase1);
+    assertEquals(true, admin.tableExists(path1));
+    // close loadTest2, because it is the last one who reference table 'path1'
+    // it will do the gc!
+    loadTest2.close();
+    assertEquals(false, admin.tableExists(path1));
+
+    // if we try to load non-existed matrix using aliase name, it should fail.
+    DenseMatrix loadTest3 = null;
+    try {
+      loadTest3 = new DenseMatrix(conf, aliase1, false);
+      fail("Try to load a non-existed matrix should fail!");
+    } catch (IOException e) {
+
+    } finally {
+      if (loadTest3 != null)
+        loadTest3.close();
+    }
+  }
+
+  public void testForceCreate() throws IOException {
+    String path2 = m2.getPath();
+    // save m2 to aliase2
+    m2.save(aliase2);
+    // load matrix m2 using aliase2
+    DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
+
+    for (int i = 0; i < loadTest.getRows(); i++) {
+      for (int j = 0; j < loadTest.getColumns(); j++) {
+        assertEquals(m2.get(i, j), loadTest.get(i, j));
+      }
+    }
+
+    assertEquals(path2, loadTest.getPath());
+
+    // force to create matrix loadTest2 using aliasename 'aliase2'
+    DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
+    String loadPath2 = loadTest2.getPath();
+    assertFalse(path2.equals(loadPath2));
+    assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
+    assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
+
+    // try to close m2 & loadTest, it table will be deleted finally
+    m2.close();
+    assertEquals(true, admin.tableExists(path2));
+    loadTest.close();
+    assertEquals(false, admin.tableExists(path2));
+
+    // remove 'aliase2' & close loadTest2
+    loadTest2.close();
+    assertEquals(true, admin.tableExists(loadPath2));
+    hamaAdmin.delete(aliase2);
+    assertEquals(false, admin.tableExists(loadPath2));
+  }
+
+  /**
+   * Verifying multiplication result
+   * 
+   * @param m1
+   * @param m2
+   * @param result
+   * @throws IOException
+   */
+  private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
+      throws IOException {
+    double[][] C = new double[SIZE][SIZE];
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        for (int k = 0; k < SIZE; k++) {
+          C[i][k] += m1.get(i, j) * m2.get(j, k);
+        }
+      }
+    }
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        assertEquals(String.valueOf(result.get(i, j)).substring(0, 14), String
+            .valueOf(C[i][j]).substring(0, 14));
+      }
+    }
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=730103&r1=730102&r2=730103&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Tue Dec 30 03:52:47 2008
@@ -21,20 +21,13 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hama.DenseMatrix;
 import org.apache.hama.HCluster;
 import org.apache.hama.Matrix;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockWritable;
 import org.apache.log4j.Logger;
 
 public class TestBlockMatrixMapReduce extends HCluster {
   static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
-  static Matrix c;
   static final int SIZE = 32;
 
   /** constructor */
@@ -50,41 +43,22 @@
     ((DenseMatrix) m1).blocking_mapred(16);
     ((DenseMatrix) m2).blocking_mapred(16);
 
-    miniMRJob(m1.getPath(), m2.getPath());
+    Matrix c = m1.mult(m2);
 
-    double[][] C = new double[SIZE][SIZE];
+    double[][] mem = new double[SIZE][SIZE];
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
         for (int k = 0; k < SIZE; k++) {
-          C[i][k] += m1.get(i, j) * m2.get(j, k);
+          mem[i][k] += m1.get(i, j) * m2.get(j, k);
         }
       }
     }
 
     for (int i = 0; i < SIZE; i++) {
       for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf(
+        assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf(
             c.get(i, j)).substring(0, 5));
       }
     }
   }
-
-  private void miniMRJob(String string, String string2) throws IOException {
-    c = new DenseMatrix(conf);
-    String output = c.getPath();
-
-    JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
-    jobConf.setJobName("test MR job");
-
-    BlockCyclicMultiplyMap.initJob(string, string2,
-        BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
-        jobConf);
-    BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class,
-        jobConf);
-
-    jobConf.setNumMapTasks(2);
-    jobConf.setNumReduceTasks(2);
-
-    JobClient.runJob(jobConf);
-  }
 }