You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/01/15 04:11:27 UTC

svn commit: r734602 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/java/org/apache/hama/util/ src/test/...

Author: edwardyoon
Date: Wed Jan 14 19:11:26 2009
New Revision: 734602

URL: http://svn.apache.org/viewvc?rev=734602&view=rev
Log:
Improving speed of matrix multiplication

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan 14 19:11:26 2009
@@ -33,6 +33,7 @@
     
   IMPROVEMENTS
     
+    HAMA-129: Improving speed of matrix multiplication (edwardyoon)
     HAMA-142: Trunk doesn't work for large matrices (edwardyoon)
     HAMA-143: Improve of random_mapred() (edwardyoon)
     HAMA-138: To order, left pad with zeroes to integer key (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/AbstractExample.java Wed Jan 14 19:11:26 2009
@@ -26,7 +26,7 @@
 
 public abstract class AbstractExample {
   public static final HamaConfiguration conf = new HamaConfiguration();
-  protected static List<String> ARGS;
+  public static List<String> ARGS;
   
   public static void parseArgs(String[] args) {
     List<String> other_args = new ArrayList<String>();

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Wed Jan 14 19:11:26 2009
@@ -39,17 +39,16 @@
 
     DenseMatrix a = new DenseMatrix(conf, matrixA, false);
     DenseMatrix b = new DenseMatrix(conf, matrixB, false);
-
+    Matrix c;
+    
     if (ARGS.size() > 2) {
-      a.blocking_mapred(Integer.parseInt(ARGS.get(2)));
-      b.blocking_mapred(Integer.parseInt(ARGS.get(2)));
+      c = a.mult(b, Integer.parseInt(ARGS.get(2)));
+    } else {
+      c = a.mult(b);
     }
 
-    Matrix c = a.mult(b);
     for (int i = 0; i < 2; i++) {
-      for (int j = 0; j < 2; j++) {
-        System.out.println("c(" + i + ", " + j + ") : " + c.get(i, j));
-      }
+      System.out.println("c(" + 0 + ", " + i + ") : " + c.get(0, i));
     }
     System.out.println("...");
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Wed Jan 14 19:11:26 2009
@@ -73,13 +73,16 @@
   protected void create() throws IOException {
     // It should run only when table doesn't exist.
     if (!admin.tableExists(matrixPath)) {
-      this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN));
+      this.tableDesc.addFamily(new HColumnDescriptor(
+          Bytes.toBytes(Constants.COLUMN), 3, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
       this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
       this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
-      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
-          .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
-          Integer.MAX_VALUE, HConstants.FOREVER, false));
-
+      // It's a temporary data.
+      this.tableDesc.addFamily(new HColumnDescriptor(
+      Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
+      Integer.MAX_VALUE, HConstants.FOREVER, false));
+      
       LOG.info("Initializing the matrix storage.");
       this.admin.createTable(this.tableDesc);
       LOG.info("Create Matrix " + matrixPath);

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Jan 14 19:11:26 2009
@@ -28,10 +28,8 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -39,8 +37,8 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.algebra.BlockMultiplyMap;
+import org.apache.hama.algebra.BlockMultiplyReduce;
 import org.apache.hama.algebra.RowCyclicAdditionMap;
 import org.apache.hama.algebra.RowCyclicAdditionReduce;
 import org.apache.hama.algebra.SIMDMultiplyMap;
@@ -383,30 +381,53 @@
   }
 
   public Matrix mult(Matrix B) throws IOException {
-    Matrix result = new DenseMatrix(config);
-
+  Matrix result = new DenseMatrix(config);
+    
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("multiplication MR job : " + result.getPath());
 
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    
+    SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
+        SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
+        jobConf);
+    SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
+        jobConf);
+    JobManager.execute(jobConf, result);
+    return result;
+  }
 
-    if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
-      BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), 
-          ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(),
-          BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class,
-          jobConf);
-      BlockCyclicMultiplyReduce.initJob(result.getPath(),
-          BlockCyclicMultiplyReduce.class, jobConf);
-    } else {
-      SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
-          SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
-          jobConf);
-      SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
-          jobConf);
-    }
+  /**
+   * A * B
+   * 
+   * @param B
+   * @param blocks the number of blocks
+   * @return C
+   * @throws IOException
+   */
+  public Matrix mult(Matrix B, int blocks) throws IOException {
+    Matrix collectionTable = new DenseMatrix(config);
+    LOG.info("Collect Blocks");
+    collectBlocks(this, collectionTable, blocks, true);
+    collectBlocks(B, collectionTable, blocks, false);
+
+    Matrix result = new DenseMatrix(config);
+    
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    
+    BlockMultiplyMap.initJob(collectionTable.getPath(), 
+        BlockMultiplyMap.class, BlockID.class, BlockWritable.class,
+        jobConf);
+    BlockMultiplyReduce.initJob(result.getPath(),
+        BlockMultiplyReduce.class, jobConf);
 
     JobManager.execute(jobConf, result);
+    // Should be collectionTable removed?
     return result;
   }
 
@@ -448,6 +469,9 @@
     return this.getClass().getSimpleName();
   }
 
+  /**
+   * Gets the sub matrix
+   */
   public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
     int columnSize = (j1 - j0) + 1;
     SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
@@ -474,37 +498,23 @@
     return result;
   }
 
-  public boolean isBlocked() throws IOException {
-    return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false
-        : true;
-  }
-
-  public SubMatrix getBlock(int i, int j) throws IOException {
-    return new SubMatrix(table.get(new BlockID(i, j).getBytes(),
-        Bytes.toBytes(Constants.BLOCK)).getValue());
-  }
-
-  public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
-    BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
-    update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes());
-    table.commit(update);
-  }
-  
   /**
-   * Using a map/reduce job to block a dense matrix.
+   * Collect Blocks
    * 
+   * @param resource
+   * @param collectionTable
    * @param blockNum
+   * @param bool
    * @throws IOException
    */
-  public void blocking_mapred(int blockNum) throws IOException {
+  public void collectBlocks(Matrix resource, Matrix collectionTable, 
+      int blockNum, boolean bool) throws IOException {
     double blocks = Math.pow(blockNum, 0.5);
     if (!String.valueOf(blocks).endsWith(".0"))
       throw new IOException("can't divide.");
 
     int block_size = (int) blocks;
-    Matrix blockedMatrix = new DenseMatrix(config);
-    blockedMatrix.setDimension(block_size, block_size);
-    this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size);
+    collectionTable.setDimension(block_size, block_size);
     
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("Blocking MR job" + getPath());
@@ -512,25 +522,8 @@
     jobConf.setNumMapTasks(config.getNumMapTasks());
     jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-    BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(), 
-        block_size, this.getRows(), this.getColumns(), jobConf);
+    BlockingMapRed.initJob(resource.getPath(), collectionTable.getPath(), 
+        bool, block_size, this.getRows(), this.getColumns(), jobConf);
     JobManager.execute(jobConf);
   }
-
-  public String getBlockedMatrixPath() throws IOException {
-    return Bytes.toString(table.get(Constants.METADATA,
-        Constants.BLOCK_PATH).getValue());
-  }
-
-  protected void setBlockedMatrixPath(String path, int size) throws IOException {
-    BatchUpdate update = new BatchUpdate(Constants.METADATA);
-    update.put(Constants.BLOCK_PATH, Bytes.toBytes(path));
-    update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size));
-    table.commit(update);
-  }
-  
-  public int getBlockedMatrixSize() throws IOException {
-    return Bytes.toInt(table.get(Constants.METADATA,
-        Constants.BLOCK_SIZE).getValue());
-  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Wed Jan 14 19:11:26 2009
@@ -209,5 +209,15 @@
     return data;
   }
 
+  public String toString() {
+    String result = "";
+    for (int i = 0; i < this.getRows(); i++) {
+      for (int j = 0; j < this.getColumns(); j++) {
+        result += this.get(i, j) + "\t";
+      }
+      result += "\n";
+    }
+    return result;
+  }
 }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockInputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyMap extends MapReduceBase implements
+    Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockMultiplyMap.class);
+
+  public static void initJob(String matrix_a,
+      Class<BlockMultiplyMap> map, Class<BlockID> outputKeyClass,
+      Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+
+    jobConf.setInputFormat(BlockInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_a);
+
+    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+  }
+
+  @Override
+  public void map(BlockID key, BlockWritable value,
+      OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
+      throws IOException {
+    
+    SubMatrix c = value.get(0).mult(value.get(1));
+    output.collect(key, new BlockWritable(c));
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyReduce extends MapReduceBase implements
+    Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(BlockMultiplyReduce.class);
+
+  /**
+   * Use this before submitting a BlockCyclicMultiplyReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<BlockMultiplyReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  @Override
+  public void reduce(BlockID key, Iterator<BlockWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+
+    SubMatrix s = null;
+    while (values.hasNext()) {
+      SubMatrix b = values.next().getMatrices().next();
+      if (s == null) {
+        s = b;
+      } else {
+        s = s.add(b);
+      }
+    }
+
+    int startRow = key.getRow() * s.getRows();
+    int startColumn = key.getColumn() * s.getColumns();
+
+    for (int i = 0; i < s.getRows(); i++) {
+      VectorUpdate update = new VectorUpdate(i + startRow);
+      for (int j = 0; j < s.getColumns(); j++) {
+        update.put(j + startColumn, s.get(i, j));
+      }
+      output.collect(new IntWritable(key.getRow()), update);
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockID.java Wed Jan 14 19:11:26 2009
@@ -34,7 +34,8 @@
   public static final int PAD_SIZE = 15;
   private int row;
   private int column;
-
+  private int seq = -1;
+  
   public BlockID() {
   }
 
@@ -61,12 +62,18 @@
 
     try {
       this.row = Integer.parseInt(keys[1]);
-      this.column = Integer.parseInt(keys[2]);
+      String[] columns = keys[2].split("[-]");
+      this.column = Integer.parseInt(columns[0]);
     } catch (ArrayIndexOutOfBoundsException e) {
       throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e);
     }
   }
 
+  public BlockID(int row, int column, int seq) {
+    set(row, column);
+    this.seq = seq;
+  }
+
   public void set(int row, int column) {
     this.row = row;
     this.column = column;
@@ -101,7 +108,11 @@
       buf.append("0");
     }
 
-    return buf.toString() + "," + row + "," + column;
+    if(seq > -1) {
+      return buf.toString() + "," + row + "," + column + "-" + seq;
+    } else {
+      return buf.toString() + "," + row + "," + column;
+    }
   }
 
   @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Wed Jan 14 19:11:26 2009
@@ -19,59 +19,90 @@
  */
 package org.apache.hama.io;
 
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.SubMatrix;
 
 public class BlockWritable implements Writable {
-  private SubMatrix matrix;
+  static final Log LOG = LogFactory.getLog(BlockWritable.class);
+  private List<SubMatrix> matrices;
 
   public BlockWritable() {
-    this.matrix = new SubMatrix(0, 0);
-  }
-
-  public BlockWritable(SubMatrix c) {
-    this.matrix = c;
+    this.matrices = new ArrayList<SubMatrix>();
   }
 
-  public BlockWritable(byte[] bytes) throws IOException {
-    this.matrix = new SubMatrix(bytes);
+  public BlockWritable(SubMatrix subMatrix) {
+    this.matrices = new ArrayList<SubMatrix>();
+    this.matrices.add(subMatrix);
   }
 
   public void readFields(DataInput in) throws IOException {
-    
-    int rows = in.readInt();
-    int columns = in.readInt();
-    this.matrix = new SubMatrix(rows, columns);
-    
-    for(int i = 0; i < rows; i++) {
-      for(int j = 0; j < columns; j++) {
-        this.matrix.set(i, j, in.readDouble());
+    this.matrices.clear();
+    int size = in.readInt();
+
+    for (int x = 0; x < size; x++) {
+      int rows = in.readInt();
+      int columns = in.readInt();
+
+      SubMatrix matrix = new SubMatrix(rows, columns);
+
+      for (int i = 0; i < rows; i++) {
+        for (int j = 0; j < columns; j++) {
+          matrix.set(i, j, in.readDouble());
+        }
       }
+
+      this.matrices.add(matrix);
     }
-    
-    //this.matrix = new SubMatrix(Bytes.readByteArray(in));
   }
 
   public void write(DataOutput out) throws IOException {
-    //Bytes.writeByteArray(out, this.matrix.getBytes());
-    
-    out.writeInt(this.matrix.getRows());
-    out.writeInt(this.matrix.getColumns());
-    
-    for(int i = 0; i < this.matrix.getRows(); i++) {
-      for(int j = 0; j < this.matrix.getColumns(); j++) {
-        out.writeDouble(this.matrix.get(i, j));
+    Iterator<SubMatrix> it = this.matrices.iterator();
+
+    int size = this.matrices.size();
+    out.writeInt(size);
+
+    while (it.hasNext()) {
+      SubMatrix matrix = it.next();
+
+      out.writeInt(matrix.getRows());
+      out.writeInt(matrix.getColumns());
+
+      for (int i = 0; i < matrix.getRows(); i++) {
+        for (int j = 0; j < matrix.getColumns(); j++) {
+          out.writeDouble(matrix.get(i, j));
+        }
       }
     }
   }
 
-  public SubMatrix get() {
-    return this.matrix;
+  public void set(byte[] key, byte[] value) throws IOException {
+    int index = 0;
+    if (new String(key).equals(Constants.BLOCK + "b")) {
+      index = 1;
+    }
+
+    this.matrices.add(index, new SubMatrix(value));
   }
-}
 
+  public Iterator<SubMatrix> getMatrices() {
+    return this.matrices.iterator();
+  }
+  
+  public SubMatrix get(int index) {
+    return this.matrices.get(index);
+  }
+  
+  public int size() {
+    return this.matrices.size();
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Wed Jan 14 19:11:26 2009
@@ -20,10 +20,12 @@
 package org.apache.hama.mapred;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.util.Writables;
@@ -34,7 +36,6 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.Constants;
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 
@@ -43,20 +44,11 @@
   static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
   private TableRecordReader tableRecordReader;
   
-  public static int getRepeatCount() {
-    return TableRecordReader.repeatCount;
-  }
-  
-  public static int getRepeat() {
-    return repeat;
-  }
-  
   /**
    * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
    */
   protected static class TableRecordReader extends TableRecordReaderBase
       implements RecordReader<BlockID, BlockWritable> {
-    private static int repeatCount = 0;
     
     /**
      * @return IntWritable
@@ -99,21 +91,17 @@
       
       boolean hasMore = result != null && result.size() > 0;
       
-      // Scanner will be restarted.
-      if(!hasMore && repeatCount < repeat - 1) {
-        this.init();
-        repeatCount++;
-        result = this.scanner.next();
-        hasMore = result != null && result.size() > 0;
-      }
-      
       if (hasMore) {
         byte[] row = result.getRow();
         BlockID bID = new BlockID(row);
         lastRow = row;
         key.set(bID.getRow(), bID.getColumn());
-        byte[] rs = result.get(Constants.BLOCK).getValue();
-        Writables.copyWritable(new BlockWritable(rs), value);
+
+        BlockWritable block = new BlockWritable();
+        for(Map.Entry<byte[], Cell> e : result.entrySet()) {
+          block.set(e.getKey(), e.getValue().getValue());
+        }
+        Writables.copyWritable(block, value);
       }
       return hasMore;
     }

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java?rev=734602&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java Wed Jan 14 19:11:26 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hama.Constants;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+
+public class BlockOutputFormat extends
+    FileOutputFormat<BlockID, BlockWritable> {
+  
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hama.mapred.output";
+  public static final String COLUMN = "hama.mapred.output.column";
+  private final static Log LOG = LogFactory.getLog(VectorOutputFormat.class);
+
+  /**
+   * Convert Reduce output (key, value) to (IntWritable, VectorUpdate)
+   * and write to an HBase table
+   */
+  protected static class TableRecordWriter implements
+      RecordWriter<BlockID, BlockWritable> {
+    private HTable m_table;
+    private BatchUpdate update;
+    private String column;
+    
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     * 
+     * @param table
+     * @param col 
+     */
+    public TableRecordWriter(HTable table, String col) {
+      m_table = table;
+      column = col;
+    }
+
+    public void close(@SuppressWarnings("unused")
+    Reporter reporter) throws IOException {
+      m_table.flushCommits();
+    }
+
+    /** {@inheritDoc} */
+    public void write(BlockID key, BlockWritable value) throws IOException {
+      Iterator<SubMatrix> it = value.getMatrices();
+      update = new BatchUpdate(key.getBytes());
+      update.put(Bytes.toBytes(Constants.BLOCK + column), it.next().getBytes());
+      m_table.commit(new BatchUpdate(update));
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+      String name, Progressable progress) throws IOException {
+
+    // expecting exactly one path
+    String column = job.get(COLUMN);
+    String tableName = job.get(OUTPUT_TABLE);
+    HTable table = null;
+    try {
+      table = new HTable(new HBaseConfiguration(job), tableName);
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    }
+    return new TableRecordWriter(table, column);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+      throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+    String tableName = job.get(OUTPUT_TABLE);
+    if (tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Wed Jan 14 19:11:26 2009
@@ -29,13 +29,11 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
 import org.apache.hama.DenseVector;
-import org.apache.hama.HamaConfiguration;
 import org.apache.hama.SubMatrix;
 import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.VectorWritable;
 
 /**
@@ -45,74 +43,71 @@
 
   static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
   /** Parameter of the path of the matrix to be blocked * */
-  public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
-  public static final String BLOCKED_MATRIX = "hama.blocked.matrix";
   public static final String BLOCK_SIZE = "hama.blocking.size";
   public static final String ROWS = "hama.blocking.rows";
   public static final String COLUMNS = "hama.blocking.columns";
+  public static final String MATRIX_POS = "a.ore.b";
 
   /**
    * Initialize a job to blocking a table
    * 
    * @param matrixPath
-   * @param string 
+   * @param collectionTable 
+   * @param block_size 
    * @param j 
    * @param i 
-   * @param block_size 
    * @param job
    */
-  public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) {
+  public static void initJob(String matrixPath, String collectionTable, boolean bool
+      ,int block_size, int i, int j, JobConf job) {
     job.setMapperClass(BlockingMapper.class);
     job.setReducerClass(BlockingReducer.class);
     FileInputFormat.addInputPaths(job, matrixPath);
 
     job.setInputFormat(VectorInputFormat.class);
+    
     job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
-    job.setOutputFormat(NullOutputFormat.class);
-
-    job.set(BLOCKING_MATRIX, matrixPath);
-    job.set(BLOCKED_MATRIX, string);
+    
+    job.setOutputFormat(BlockOutputFormat.class);
+    job.setOutputKeyClass(BlockID.class);
+    job.setOutputValueClass(BlockWritable.class);
     job.set(BLOCK_SIZE, String.valueOf(block_size));
     job.set(ROWS, String.valueOf(i));
     job.set(COLUMNS, String.valueOf(j));
-    
+    job.setBoolean(MATRIX_POS, bool);
     job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    if(bool)
+      job.set(BlockOutputFormat.COLUMN, "a");
+    else
+      job.set(BlockOutputFormat.COLUMN, "b");
+    job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable);
   }
 
   /**
    * Abstract Blocking Map/Reduce Class to configure the job.
    */
   public static abstract class BlockingMapRedBase extends MapReduceBase {
-
-    protected DenseMatrix matrix;
-    protected DenseMatrix blockedMatrix;
     protected int mBlockNum;
     protected int mBlockRowSize;
     protected int mBlockColSize;
     
     protected int mRows;
     protected int mColumns;
+    
+    protected boolean matrixPos;
 
     @Override
     public void configure(JobConf job) {
-      try {
-        matrix = new DenseMatrix(new HamaConfiguration(), job.get(
-            BLOCKING_MATRIX, ""));
-        blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get(
-            BLOCKED_MATRIX, ""));
-        
         mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, ""));
         mRows = Integer.parseInt(job.get(ROWS, ""));
         mColumns = Integer.parseInt(job.get(COLUMNS, ""));
         
         mBlockRowSize = mRows / mBlockNum;
         mBlockColSize = mColumns / mBlockNum;
-      } catch (IOException e) {
-        LOG.warn("Load matrix_blocking failed : " + e.getMessage());
-      }
+        
+        matrixPos = job.getBoolean(MATRIX_POS, true);
     }
-
   }
 
   /**
@@ -142,21 +137,19 @@
         i++;
       } while(endColumn < (mColumns-1));
     }
-
   }
 
   /**
    * Reducer Class
    */
   public static class BlockingReducer extends BlockingMapRedBase implements
-      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
+      Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
 
     @Override
     public void reduce(BlockID key, Iterator<VectorWritable> values,
-        OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
+        OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
         throws IOException {
-      // Note: all the sub-vectors are grouped by {@link
-      // org.apache.hama.io.BlockID}
+      // Note: all the sub-vectors are grouped by {@link org.apache.hama.io.BlockID}
       
       // the block's base offset in the original matrix
       int colBase = key.getColumn() * mBlockColSize;
@@ -189,9 +182,22 @@
           subMatrix.set(i, j, vw.get(colBase + j));
         }
       }
+      BlockWritable outValue = new BlockWritable(subMatrix);
 
-      blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
+      // It'll used for only matrix multiplication.
+      if(matrixPos) {
+        for (int x = 0; x < mBlockNum; x++) {
+          int r = (key.getRow() * mBlockNum) * mBlockNum;
+          int seq = (x * mBlockNum) + key.getColumn() + r;
+          output.collect(new BlockID(key.getRow(), x, seq), outValue);
+        }
+      } else {
+        for (int x = 0; x < mBlockNum; x++) {
+          int seq = (x * mBlockNum * mBlockNum) + 
+                          (key.getColumn() * mBlockNum) + key.getRow();
+          output.collect(new BlockID(x, key.getColumn(), seq), outValue);
+        }
+      }
     }
   }
-
-}
+}
\ No newline at end of file

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Wed Jan 14 19:11:26 2009
@@ -40,19 +40,14 @@
   protected byte[][] inputColumns;
   protected HTable table;
   protected RowFilterInterface rowFilter;
-  protected static int repeat;
   
   /**
    * space delimited list of columns
    */
   public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
-  public static final String REPEAT_NUM = "hama.mapred.repeat";
   
   public void configure(JobConf job) {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
-    if(job.get(REPEAT_NUM) != null) {
-      setRepeat(Integer.parseInt(job.get(REPEAT_NUM)));
-    }
     String colArg = job.get(COLUMN_LIST);
     String[] colNames = colArg.split(" ");
     byte[][] m_cols = new byte[colNames.length][];
@@ -67,10 +62,6 @@
     }
   }
 
-  private void setRepeat(int parseInt) {
-    repeat =  parseInt;
-  }
-
   public void validateInput(JobConf job) throws IOException {
     // expecting exactly one path
     Path[] tableNames = FileInputFormat.getInputPaths(job);
@@ -136,7 +127,6 @@
     return splits;
   }
 
-
   /**
    * @param inputColumns to be passed to the map task.
    */

Modified: incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java Wed Jan 14 19:11:26 2009
@@ -30,14 +30,16 @@
  */
 public class JobManager {
   public static void execute(JobConf jobConf, Matrix result) throws IOException {
-    JobClient.runJob(jobConf);
-    //long rows = rJob.getCounters().findCounter(
-      //  "org.apache.hadoop.mapred.Task$Counter", 8, "REDUCE_OUTPUT_RECORDS")
-      // .getCounter();
-    // TODO : Thinking about more efficient method.
-    int rows = result.getColumn(0).size();
-    int columns = result.getRow(0).size();
-    result.setDimension(rows, columns);
+    try {
+      JobClient.runJob(jobConf);
+      // TODO : Thinking about more efficient method.
+      int rows = result.getColumn(0).size();
+      int columns = result.getRow(0).size();
+      result.setDimension(rows, columns);
+    } catch (IOException e) {
+      result.close();
+      throw new IOException(e);
+    }
   }
   
   /** 

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Wed Jan 14 19:11:26 2009
@@ -83,18 +83,6 @@
   }
 
   /**
-   * Map/Reduce Blocking Test
-   * 
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  public void testMRBlocking() throws IOException, ClassNotFoundException {
-    assertEquals(((DenseMatrix) m2).isBlocked(), false);
-    ((DenseMatrix) m2).blocking_mapred(4);
-    assertEquals(((DenseMatrix) m2).isBlocked(), true);
-  }
-
-  /**
    * Column vector test.
    * 
    * @param rand

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=734602&r1=734601&r2=734602&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Wed Jan 14 19:11:26 2009
@@ -23,7 +23,6 @@
 
 import org.apache.hama.DenseMatrix;
 import org.apache.hama.HCluster;
-import org.apache.hama.Matrix;
 import org.apache.log4j.Logger;
 
 public class TestBlockMatrixMapReduce extends HCluster {
@@ -37,13 +36,10 @@
 
   public void testBlockMatrixMapReduce() throws IOException,
       ClassNotFoundException {
-    Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
-    Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
-    // Partitioning 8 * 8 submatrix. It also the test submatrix() and scanner.
-    ((DenseMatrix) m1).blocking_mapred(16);
-    ((DenseMatrix) m2).blocking_mapred(16);
+    DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+    DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
 
-    Matrix c = m1.mult(m2);
+    DenseMatrix c = (DenseMatrix) m1.mult(m2, 16);
 
     double[][] mem = new double[SIZE][SIZE];
     for (int i = 0; i < SIZE; i++) {