You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/07/29 11:33:20 UTC

svn commit: r680652 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/ src/test/org/apache/hama/mapred/

Author: edwardyoon
Date: Tue Jul 29 02:33:18 2008
New Revision: 680652

URL: http://svn.apache.org/viewvc?rev=680652&view=rev
Log:
https://issues.apache.org/jira/browse/HAMA-10

Added:
    incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/Vector.java
    incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/io/
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/TestVector.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/HamaConstants.java
    incubator/hama/trunk/src/test/org/apache/hama/TestFeatureVector.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/Matrix.java
    incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jul 29 02:33:18 2008
@@ -2,6 +2,8 @@
 
 Trunk (unreleased changes)
 
+    HAMA-10: Refactor the mapred package for the latest version of dependencies (edwardyoon)  
+    HAMA-9: Upgrade dependencies (edwardyoon)
     HAMA-6: Add a 'who we are' page (edwardyoon)
     HAMA-7: Add some information for a new comer (edwardyoon)
     HAMA-1: Create the Hama web site (edwardyoon via Ian Holsman)

Added: incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,15 @@
+package org.apache.hama;
+
+public abstract class AbstractBase {
+  /**
+   * Return the integer column index
+   * 
+   * @param b key
+   * @return integer
+   */
+  public int getColumnIndex(byte[] b) {
+    String cKey = new String(b);
+    return Integer.parseInt(cKey
+        .substring(cKey.indexOf(":") + 1, cKey.length()));
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Jul 29 02:33:18 2008
@@ -38,7 +38,7 @@
 /**
  * Methods of the matrix classes
  */
-public abstract class AbstractMatrix implements MatrixInterface {
+public abstract class AbstractMatrix extends AbstractBase implements MatrixInterface {
   static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
 
   /** Hbase Configuration */
@@ -77,7 +77,7 @@
    */
   protected void create() {
     try {
-      tableDesc.addFamily(new HColumnDescriptor(HamaConstants.METADATA
+      tableDesc.addFamily(new HColumnDescriptor(Constants.METADATA
           .toString()));
       LOG.info("Initializaing.");
       admin.createTable(tableDesc);
@@ -90,7 +90,7 @@
   public int getRowDimension() {
     Cell rows = null;
     try {
-      rows = table.get(HamaConstants.METADATA, HamaConstants.METADATA_ROWS);
+      rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
     } catch (IOException e) {
       LOG.error(e, e);
     }
@@ -102,8 +102,8 @@
   public int getColumnDimension() {
     Cell columns = null;
     try {
-      columns = table.get(HamaConstants.METADATA,
-          HamaConstants.METADATA_COLUMNS);
+      columns = table.get(Constants.METADATA,
+          Constants.METADATA_COLUMNS);
     } catch (IOException e) {
       LOG.error(e, e);
     }
@@ -113,7 +113,7 @@
   /** {@inheritDoc} */
   public double get(int i, int j) {
     Text row = new Text(String.valueOf(i));
-    Text column = new Text(HamaConstants.COLUMN + String.valueOf(j));
+    Text column = new Text(Constants.COLUMN + String.valueOf(j));
     Cell c;
     double result = -1;
     try {
@@ -157,7 +157,7 @@
   /** {@inheritDoc} */
   public void set(int i, int j, double d) {
     BatchUpdate b = new BatchUpdate(new Text(String.valueOf(i)));
-    b.put(new Text(HamaConstants.COLUMN + String.valueOf(j)), doubleToBytes(d));
+    b.put(new Text(Constants.COLUMN + String.valueOf(j)), doubleToBytes(d));
     try {
       table.commit(b);
     } catch (IOException e) {
@@ -187,9 +187,9 @@
 
   /** {@inheritDoc} */
   public void setDimension(int rows, int columns) {
-    BatchUpdate b = new BatchUpdate(HamaConstants.METADATA);
-    b.put(HamaConstants.METADATA_ROWS, Bytes.toBytes(rows));
-    b.put(HamaConstants.METADATA_COLUMNS, Bytes.toBytes(columns));
+    BatchUpdate b = new BatchUpdate(Constants.METADATA);
+    b.put(Constants.METADATA_ROWS, Bytes.toBytes(rows));
+    b.put(Constants.METADATA_COLUMNS, Bytes.toBytes(columns));
 
     try {
       table.commit(b);
@@ -211,8 +211,8 @@
   public double getDeterminant() {
     try {
       return bytesToDouble(table.get(
-          new Text(String.valueOf(HamaConstants.DETERMINANT)),
-          new Text(HamaConstants.COLUMN)).getValue());
+          new Text(String.valueOf(Constants.DETERMINANT)),
+          new Text(Constants.COLUMN)).getValue());
     } catch (IOException e) {
       LOG.error(e, e);
       return -1;
@@ -245,16 +245,4 @@
       LOG.error(e, e);
     }
   }
-
-  /**
-   * Return the integer column index
-   * 
-   * @param b key
-   * @return integer
-   */
-  public int getColumnIndex(byte[] b) {
-    String cKey = new String(b);
-    return Integer.parseInt(cKey
-        .substring(cKey.indexOf(":") + 1, cKey.length()));
-  }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Some constants used in the hama
+ */
+public class Constants {
+  /** Meta-columnfamily to store the matrix-info */
+  public final static Text METADATA = new Text("metadata:");
+  /** The number of the matrix rows */
+  public final static Text METADATA_ROWS = new Text("metadata:rows");
+  /** The number of the matrix columns */
+  public final static Text METADATA_COLUMNS = new Text("metadata:columns");
+  /** The type of the matrix */
+  public final static Text METADATA_TYPE = new Text("metadata:type");
+
+  /** plus operator */
+  public final static String PLUS = "+";
+  /** minus operator */
+  public final static String MINUS = "-";
+
+  /** Default columnfamily name */
+  public final static Text COLUMN = new Text("column:");
+  /** The numerator version of the fraction matrix */
+  public final static Text NUMERATOR = new Text("numerator:");
+  /** The denominator version of the fration matrix */
+  public final static Text DENOMINATOR = new Text("denominator:");
+  /** The original version of the fraction matrix */
+  public final static Text ORIGINAL = new Text("original:");
+  /** The lower matrix version of the triangular matrix */
+  public final static Text LOWER = new Text("lower:");
+  /** The upper matrix version of the triangular matrix */
+  public final static Text UPPER = new Text("upper:");
+
+  /** A determinant value record */
+  public final static Text DETERMINANT = new Text("determinant");
+
+  /** Temporary random matices name-head */
+  public final static String RANDOM = "rand";
+  /** Temporary result matices name-head */
+  public final static String RESULT = "result";
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/Matrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Matrix.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Matrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Matrix.java Tue Jul 29 02:33:18 2008
@@ -55,7 +55,7 @@
 
       if (!admin.tableExists(matrixName)) {
         tableDesc = new HTableDescriptor(matrixName.toString());
-        tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN
+        tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN
             .toString()));
         create();
       }
@@ -82,7 +82,7 @@
 
       if (!admin.tableExists(matrixName)) {
         tableDesc = new HTableDescriptor(matrixName.toString());
-        tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN
+        tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN
             .toString()));
         create();
       }
@@ -154,12 +154,12 @@
 
   /** {@inheritDoc} */
   public Matrix addition(Matrix b) {
-    return additionSubtraction(b, HamaConstants.PLUS);
+    return additionSubtraction(b, Constants.PLUS);
   }
 
   /** {@inheritDoc} */
   public Matrix subtraction(Matrix b) {
-    return additionSubtraction(b, HamaConstants.PLUS);
+    return additionSubtraction(b, Constants.PLUS);
   }
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java Tue Jul 29 02:33:18 2008
@@ -56,7 +56,7 @@
    * @return random name
    */
   protected static Text randMatrixName() {
-    String rName = HamaConstants.RANDOM;
+    String rName = Constants.RANDOM;
     for (int i = 1; i <= 5; i++) {
       char ch = (char) ((Math.random() * 26) + 97);
       rName += ch;

Added: incubator/hama/trunk/src/java/org/apache/hama/Vector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Vector.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Vector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/Vector.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,127 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.io.VectorDatum;
+import org.apache.log4j.Logger;
+
+public class Vector extends AbstractBase implements VectorInterface {
+  static final Logger LOG = Logger.getLogger(Vector.class);
+  protected int[] m_dims;
+  protected double[] m_vals;
+
+  public Vector(RowResult r) {
+    parse(r.entrySet());
+  }
+
+  public Vector(VectorDatum r) {
+    parse(r.entrySet());
+  }
+
+  private void parse(Set<Entry<byte[], Cell>> entrySet) {
+    SortedMap<Integer, Double> m = new TreeMap<Integer, Double>();
+    for (Map.Entry<byte[], Cell> f : entrySet) {
+      m.put(getColumnIndex(f.getKey()), Double.parseDouble(Bytes.toString(f
+          .getValue().getValue())));
+    }
+
+    this.m_dims = new int[m.keySet().size()];
+    this.m_vals = new double[m.keySet().size()];
+
+    int i = 0;
+    for (Map.Entry<Integer, Double> f : m.entrySet()) {
+      this.m_dims[i] = f.getKey();
+      this.m_vals[i] = f.getValue();
+      i++;
+    }
+  }
+
+  /**
+   * Returns the cosine similarity between two feature vectors.
+   */
+  public double getCosine(Vector v) {
+    double cosine = 0.0;
+    int dim;
+    double q_i, d_i;
+    for (int i = 0; i < Math.min(this.size(), v.size()); i++) {
+      dim = v.getDimAt(i);
+      q_i = v.getValueAt(dim);
+      d_i = this.getValueAt(dim);
+      cosine += q_i * d_i;
+    }
+    return cosine / (this.getL2Norm() * v.getL2Norm());
+  }
+
+  /**
+   * Returns the linear norm factor of this vector's values (i.e., the sum of
+   * it's values).
+   */
+  public double getL1Norm() {
+    double sum = 0.0;
+    for (int i = 0; i < m_vals.length; i++) {
+      sum += m_vals[i];
+    }
+    return sum;
+  }
+
+  /**
+   * Returns the L2 norm factor of this vector's values.
+   */
+  public double getL2Norm() {
+    double square_sum = 0.0;
+    for (int i = 0; i < m_vals.length; i++) {
+      square_sum += (m_vals[i] * m_vals[i]);
+    }
+    return Math.sqrt(square_sum);
+  }
+
+  public int getDimAt(int index) {
+    return m_dims[index];
+  }
+
+  public double getValueAt(int index) {
+    return m_vals[index];
+  }
+
+  public int size() {
+    return m_dims.length;
+  }
+
+  public VectorDatum addition(byte[] bs, Vector v2) {
+    HbaseMapWritable<byte[], Cell> trunk = new HbaseMapWritable<byte[], Cell>();
+    for (int i = 0; i < this.size(); i++) {
+      double value = (this.getValueAt(i) + v2.getValueAt(i));
+      Cell cValue = new Cell(String.valueOf(value), 0);
+      trunk.put(Bytes.toBytes("column:" + i), cValue);
+    }
+
+    return new VectorDatum(bs, trunk);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,30 @@
+package org.apache.hama;
+
+import org.apache.hama.io.VectorDatum;
+
+/**
+ * A vector. Features are dimension-value pairs. This class implements a
+ * simple dictionary data structure to map dimensions onto their values. Note
+ * that for convenience, features do not have be sorted according to their
+ * dimensions at this point. The SVMLightTrainer class has an option for sorting
+ * input vectors prior to training.
+ */
+public interface VectorInterface {
+  
+  public double getValueAt(int index);
+  
+  public int getDimAt(int index);
+  
+  public int size();
+  
+  public double getL1Norm();
+  
+  public double getL2Norm();
+  
+  public double getCosine(Vector v);
+  
+  public VectorDatum addition(byte[] bs, Vector v2);
+  
+  // TODO: save, copy,...,etc
+  
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,193 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Vector;
+
+public class VectorDatum implements Writable, Map<byte[], Cell> {
+  private byte[] row = null;
+  private final HbaseMapWritable<byte[], Cell> cells;
+
+  public VectorDatum() {
+    this(null, new HbaseMapWritable<byte[], Cell>());
+  }
+
+  /**
+   * Create a RowResult from a row and Cell map
+   */
+  public VectorDatum(final byte[] row, final HbaseMapWritable<byte[], Cell> m) {
+    this.row = row;
+    this.cells = m;
+  }
+
+  /**
+   * Get the row for this RowResult
+   */
+  public byte[] getRow() {
+    return row;
+  }
+
+  public Cell put(@SuppressWarnings("unused")
+  byte[] key, @SuppressWarnings("unused")
+  Cell value) {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  @SuppressWarnings("unchecked")
+  public void putAll(@SuppressWarnings("unused")
+  Map map) {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  public Cell get(Object key) {
+    return (Cell) this.cells.get(key);
+  }
+
+  public Cell remove(@SuppressWarnings("unused")
+  Object key) {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  public boolean containsKey(Object key) {
+    return cells.containsKey(key);
+  }
+
+  public boolean containsValue(@SuppressWarnings("unused")
+  Object value) {
+    throw new UnsupportedOperationException("Don't support containsValue!");
+  }
+
+  public boolean isEmpty() {
+    return cells.isEmpty();
+  }
+
+  public int size() {
+    return cells.size();
+  }
+
+  public void clear() {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  public Set<byte[]> keySet() {
+    Set<byte[]> result = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    for (byte[] w : cells.keySet()) {
+      result.add(w);
+    }
+    return result;
+  }
+
+  public Set<Map.Entry<byte[], Cell>> entrySet() {
+    return Collections.unmodifiableSet(this.cells.entrySet());
+  }
+
+  public Collection<Cell> values() {
+    ArrayList<Cell> result = new ArrayList<Cell>();
+    for (Writable w : cells.values()) {
+      result.add((Cell) w);
+    }
+    return result;
+  }
+
+  /**
+   * Get the Cell that corresponds to column
+   */
+  public Cell get(byte[] column) {
+    return this.cells.get(column);
+  }
+
+  /**
+   * Get the Cell that corresponds to column, using a String key
+   */
+  public Cell get(String key) {
+    return get(Bytes.toBytes(key));
+  }
+
+  /**
+   * Row entry.
+   */
+  public class Entry implements Map.Entry<byte[], Cell> {
+    private final byte[] column;
+    private final Cell cell;
+
+    Entry(byte[] row, Cell cell) {
+      this.column = row;
+      this.cell = cell;
+    }
+
+    public Cell setValue(@SuppressWarnings("unused")
+    Cell c) {
+      throw new UnsupportedOperationException("VectorDatum is read-only!");
+    }
+
+    public byte[] getKey() {
+      return column;
+    }
+
+    public Cell getValue() {
+      return cell;
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("row=");
+    sb.append(Bytes.toString(this.row));
+    sb.append(", cells={");
+    boolean moreThanOne = false;
+    for (Map.Entry<byte[], Cell> e : this.cells.entrySet()) {
+      if (moreThanOne) {
+        sb.append(", ");
+      } else {
+        moreThanOne = true;
+      }
+      sb.append("(column=");
+      sb.append(Bytes.toString(e.getKey()));
+      sb.append(", timestamp=");
+      sb.append(Long.toString(e.getValue().getTimestamp()));
+      sb.append(", value=");
+      byte[] v = e.getValue().getValue();
+      if (Bytes.equals(e.getKey(), HConstants.COL_REGIONINFO)) {
+        try {
+          sb.append(Writables.getHRegionInfo(v).toString());
+        } catch (IOException ioe) {
+          sb.append(ioe.toString());
+        }
+      } else {
+        sb.append(v);
+      }
+      sb.append(")");
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.row = Bytes.readByteArray(in);
+    this.cells.readFields(in);
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.row);
+    this.cells.write(out);
+  }
+
+  public Vector getVector() {
+    return new Vector(this);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,58 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class MatrixInputFormat extends MatrixInputFormatBase implements
+JobConfigurable {
+private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
+
+/**
+* space delimited list of columns
+*
+* @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
+*      wildcards
+*/
+public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+
+/** {@inheritDoc} */
+public void configure(JobConf job) {
+Path[] tableNames = FileInputFormat.getInputPaths(job);
+String colArg = job.get(COLUMN_LIST);
+String[] colNames = colArg.split(" ");
+byte [][] m_cols = new byte[colNames.length][];
+for (int i = 0; i < m_cols.length; i++) {
+  m_cols[i] = Bytes.toBytes(colNames[i]);
+}
+setInputColums(m_cols);
+try {
+  setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+} catch (Exception e) {
+  LOG.error(e);
+}
+}
+
+/** {@inheritDoc} */
+public void validateInput(JobConf job) throws IOException {
+// expecting exactly one path
+Path [] tableNames = FileInputFormat.getInputPaths(job);
+if (tableNames == null || tableNames.length > 1) {
+  throw new IOException("expecting one table name");
+}
+
+// expecting at least one column
+String colArg = job.get(COLUMN_LIST);
+if (colArg == null || colArg.length() == 0) {
+  throw new IOException("expecting at least one column");
+}
+}
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,275 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+
+public abstract class MatrixInputFormatBase
+implements InputFormat<ImmutableBytesWritable, VectorDatum> {
+  private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class);
+  private byte [][] inputColumns;
+  private HTable table;
+  private TableRecordReader tableRecordReader;
+  private RowFilterInterface rowFilter;
+
+  /**
+   * Iterate over an HBase table data, return (Text, VectorResult) pairs
+   */
+  protected class TableRecordReader
+  implements RecordReader<ImmutableBytesWritable, VectorDatum> {
+    private byte [] startRow;
+    private byte [] endRow;
+    private RowFilterInterface trrRowFilter;
+    private Scanner scanner;
+    private HTable htable;
+    private byte [][] trrInputColumns;
+
+    /**
+     * Build the scanner. Not done in constructor to allow for extension.
+     *
+     * @throws IOException
+     */
+    public void init() throws IOException {
+      if ((endRow != null) && (endRow.length > 0)) {
+        if (trrRowFilter != null) {
+          final Set<RowFilterInterface> rowFiltersSet =
+            new HashSet<RowFilterInterface>();
+          rowFiltersSet.add(new StopRowFilter(endRow));
+          rowFiltersSet.add(trrRowFilter);
+          this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+            new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+              rowFiltersSet));
+        } else {
+          this.scanner =
+            this.htable.getScanner(trrInputColumns, startRow, endRow);
+        }
+      } else {
+        this.scanner =
+          this.htable.getScanner(trrInputColumns, startRow, trrRowFilter);
+      }
+    }
+
+    /**
+     * @param htable the {@link HTable} to scan.
+     */
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    /**
+     * @param inputColumns the columns to be placed in {@link VectorDatum}.
+     */
+    public void setInputColumns(final byte [][] inputColumns) {
+      this.trrInputColumns = inputColumns;
+    }
+
+    /**
+     * @param startRow the first row in the split
+     */
+    public void setStartRow(final byte [] startRow) {
+      this.startRow = startRow;
+    }
+
+    /**
+     *
+     * @param endRow the last row in the split
+     */
+    public void setEndRow(final byte [] endRow) {
+      this.endRow = endRow;
+    }
+
+    /**
+     * @param rowFilter the {@link RowFilterInterface} to be used.
+     */
+    public void setRowFilter(RowFilterInterface rowFilter) {
+      this.trrRowFilter = rowFilter;
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      this.scanner.close();
+    }
+
+    /**
+     * @return ImmutableBytesWritable
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public ImmutableBytesWritable createKey() {
+      return new ImmutableBytesWritable();
+    }
+
+    /**
+     * @return VectorResult
+     *
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public VectorDatum createValue() {
+      return new VectorDatum();
+    }
+
+    /** {@inheritDoc} */
+    public long getPos() {
+      // This should be the ordinal tuple in the range;
+      // not clear how to calculate...
+      return 0;
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key HStoreKey as input key.
+     * @param value MapWritable as input value
+     *
+     * Converts Scanner.next() to Text, VectorResult
+     *
+     * @return true if there was more data
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    public boolean next(ImmutableBytesWritable key, VectorDatum value)
+    throws IOException {
+      RowResult result = this.scanner.next();
+      boolean hasMore = result != null && result.size() > 0;
+      if (hasMore) {
+        key.set(result.getRow());
+        Writables.copyWritable(result, value);
+      }
+      return hasMore;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+   * the default.
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<ImmutableBytesWritable, VectorDatum> getRecordReader(InputSplit split,
+      @SuppressWarnings("unused")
+      JobConf job, @SuppressWarnings("unused")
+      Reporter reporter)
+  throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   * <ul>
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link HRegion}s in the table. If the number of splits is
+   * smaller than the number of {@link HRegion}s then splits are spanned across
+   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    byte [][] startKeys = this.table.getStartKeys();
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    if (this.table == null) {
+      throw new IOException("No table was provided");
+    }
+    if (this.inputColumns == null || this.inputColumns.length == 0) {
+      throw new IOException("Expecting at least one column");
+    }
+    int realNumSplits = numSplits > startKeys.length ? startKeys.length
+        : numSplits;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      splits[i] = new TableSplit(this.table.getTableName(),
+          startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+              : HConstants.EMPTY_START_ROW);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("split: " + i + "->" + splits[i]);
+      }
+      startPos = lastPos;
+    }
+    return splits;
+
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link VectorDatum} to the map task.
+   */
+  protected void setInputColums(byte [][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   *
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader
+   *                to provide other {@link TableRecordReader} implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   *
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}
\ No newline at end of file

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,54 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+
+@SuppressWarnings("unchecked")
+public abstract class MatrixMap<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Mapper<ImmutableBytesWritable, VectorDatum, K, V> {
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table table name
+   * @param columns columns to scan
+   * @param mapper mapper class
+   * @param job job configuration
+   */
+  public static void initJob(String table, String columns,
+    Class<? extends MatrixMap> mapper, 
+    Class<? extends WritableComparable> outputKeyClass, 
+    Class<? extends Writable> outputValueClass, JobConf job) {
+      
+    job.setInputFormat(MatrixInputFormat.class);
+    job.setMapOutputValueClass(outputValueClass);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, table);
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+  }
+
+  /**
+   * Call a user defined function on a single HBase record, represented
+   * by a key and its associated record value.
+   * 
+   * @param key
+   * @param value
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public abstract void map(ImmutableBytesWritable key, VectorDatum value,
+      OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,48 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+@SuppressWarnings("unchecked")
+public abstract class MatrixReduce<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<? extends MatrixReduce> reducer, JobConf job) {
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  /**
+   * 
+   * @param key
+   * @param values
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public abstract void reduce(K key, Iterator<V> values,
+    OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
+  throws IOException;
+}
\ No newline at end of file

Added: incubator/hama/trunk/src/test/org/apache/hama/TestVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestVector.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestVector.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestVector.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.hadoop.io.Text;
+
+public class TestVector extends HamaTestCase {
+
+  /**
+   * Test cosine similarity
+   */
+  public void testCosine() {
+    final double result = 0.6978227007909176;
+    Matrix m1 = new Matrix(conf, new Text("cosine"));
+
+    // TODO : We need setArray(int row, double[] value) to matrix
+    // e.g. matrixA.setArray(0, new double[] {2,5,1,4});
+    // -- Edward
+
+    m1.set(0, 0, 2);
+    m1.set(0, 1, 5);
+    m1.set(0, 2, 1);
+    m1.set(0, 3, 4);
+
+    m1.set(1, 0, 4);
+    m1.set(1, 1, 1);
+    m1.set(1, 2, 3);
+    m1.set(1, 3, 3);
+
+    LOG.info("get test : " + m1.get(0, 0));
+    LOG.info("get test : " + m1.get(0, 1));
+
+    Vector v1 = new Vector(m1.getRowResult(0));
+    Vector v2 = new Vector(m1.getRowResult(1));
+
+    double cos = v1.getCosine(v2);
+    assertEquals(cos, result);
+    m1.close();
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Tue Jul 29 02:33:18 2008
@@ -27,17 +27,15 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableMap;
-import org.apache.hadoop.hbase.mapred.TableReduce;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.FeatureVector;
+import org.apache.hama.Vector;
 import org.apache.hama.HamaTestCase;
 import org.apache.hama.Matrix;
+import org.apache.hama.io.VectorDatum;
 import org.apache.log4j.Logger;
 
 /**
@@ -45,8 +43,6 @@
  */
 public class TestMatrixMapReduce extends HamaTestCase {
   static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
-  protected Matrix a;
-  protected Matrix b;
 
   /** constructor */
   public TestMatrixMapReduce() {
@@ -54,39 +50,35 @@
   }
 
   public static class AdditionMap extends
-      TableMap<ImmutableBytesWritable, RowResult> {
-    protected Matrix matrix_b;
+      MatrixMap<ImmutableBytesWritable, VectorDatum> {
+    protected Matrix B;
     public static final String MATRIX_B = "hama.addition.substraction.matrix.b";
 
     public void configure(JobConf job) {
-      matrix_b = new Matrix(new HBaseConfiguration(), new Text("MatrixB"));
+      B = new Matrix(new HBaseConfiguration(), new Text("MatrixB"));
     }
 
     @Override
-    public void map(ImmutableBytesWritable key, RowResult value,
-        OutputCollector<ImmutableBytesWritable, RowResult> output,
+    public void map(ImmutableBytesWritable key, VectorDatum value,
+        OutputCollector<ImmutableBytesWritable, VectorDatum> output,
         Reporter reporter) throws IOException {
 
-      FeatureVector v1 = new FeatureVector(matrix_b.getRowResult(key.get()));
-      FeatureVector v2 = new FeatureVector(value);
-      FeatureVector v3 = v1.addition(v2);
-      output.collect(key, v3.getRowResult(key.get()));
-
-      LOG.info("xxx" + v3.getValueAt(0));
-      LOG.info("xxx" + v3.getValueAt(1));
+      Vector v1 = new Vector(B.getRowResult(key.get()));
+      Vector v2 = value.getVector();
+      output.collect(key, v1.addition(key.get(), v2));
     }
   }
 
   public static class AdditionReduce extends
-      TableReduce<ImmutableBytesWritable, RowResult> {
+      MatrixReduce<ImmutableBytesWritable, VectorDatum> {
 
     @Override
-    public void reduce(ImmutableBytesWritable key, Iterator<RowResult> values,
+    public void reduce(ImmutableBytesWritable key, Iterator<VectorDatum> values,
         OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
         Reporter reporter) throws IOException {
 
       BatchUpdate b = new BatchUpdate(key.get());
-      RowResult r = values.next();
+      VectorDatum r = values.next();
       for (Map.Entry<byte[], Cell> f : r.entrySet()) {
         b.put(f.getKey(), f.getValue().getValue());
       }
@@ -96,16 +88,14 @@
   }
 
   public void testMatrixMapReduce() throws IOException {
-    a = new Matrix(conf, new Text("MatrixA"));
+    Matrix a = new Matrix(conf, new Text("MatrixA"));
     a.set(0, 0, 1);
     a.set(0, 1, 0);
-    b = new Matrix(conf, new Text("MatrixB"));
+    Matrix b = new Matrix(conf, new Text("MatrixB"));
     b.set(0, 0, 1);
     b.set(0, 1, 1);
-
     a.close();
     b.close();
-
     miniMRJob();
   }
 
@@ -116,9 +106,9 @@
     JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
     jobConf.setJobName("test MR job");
 
-    TableMap.initJob("MatrixA", "column:", AdditionMap.class,
-        ImmutableBytesWritable.class, RowResult.class, jobConf);
-    TableReduce.initJob("xanadu", AdditionReduce.class, jobConf);
+    MatrixMap.initJob("MatrixA", "column:", AdditionMap.class,
+        ImmutableBytesWritable.class, VectorDatum.class, jobConf);
+    MatrixReduce.initJob("xanadu", AdditionReduce.class, jobConf);
 
     jobConf.setNumMapTasks(1);
     jobConf.setNumReduceTasks(1);