You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/02/13 07:43:35 UTC

svn commit: r744008 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/

Author: edwardyoon
Date: Fri Feb 13 06:43:34 2009
New Revision: 744008

URL: http://svn.apache.org/viewvc?rev=744008&view=rev
Log:
Implement add(double alpha, Matrix B)

Added:
    incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
    incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Feb 13 06:43:34 2009
@@ -3,7 +3,7 @@
 Trunk (unreleased changes)
 
   NEW FEATURES
-  
+
     HAMA-151: Add multiplication example of file matrices (edwardyoon)
     HAMA-145: Add privacy policy page (edwardyoon)
     HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon)
@@ -34,7 +34,8 @@
     HAMA-2: The intial donation of Hama from the google project (edwardyoon)
     
   IMPROVEMENTS
-    
+
+    HAMA-109: Implement add(double alpha, Matrix B) (edwardyoon)
     HAMA-150: Refactor blockingMapRed (edwardyoon)
     HAMA-148: Implement of set(double alpha, Matrix B) (edwardyoon)
     HAMA-100: Implement of set(Matrix B) (edwardyoon)
@@ -98,7 +99,7 @@
     HAMA-9: Upgrade dependencies (edwardyoon)
 
   BUG FIXES
-   
+
     HAMA-147: Fix typos (edwardyoon)
     HAMA-140: In subMatrix(), Scanner should be closed (edwardyoon)
     HAMA-120: remove findbugs warning in shell package (samuel via edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Fri Feb 13 06:43:34 2009
@@ -189,6 +189,8 @@
     }
 
     public static void setAlpha(double a) {
+      if(alpha.size() > 0) 
+        alpha = new ArrayList<Double>();
       alpha.add(a);
     }
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java Fri Feb 13 06:43:34 2009
@@ -22,13 +22,13 @@
 import java.util.Iterator;
 
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
 
 /**
  * Methods of the vector classes
  */
 public abstract class AbstractVector {
-  public MapWritable<Integer, DoubleEntry> entries;
+  public HMapWritable<Integer, DoubleEntry> entries;
   
   /**
    * Gets the value of index
@@ -56,7 +56,7 @@
   public void set(int index, double value) {
     // If entries are null, create new object 
     if(this.entries == null) {
-      this.entries = new MapWritable<Integer, DoubleEntry>();
+      this.entries = new HMapWritable<Integer, DoubleEntry>();
     }
     
     this.entries.put(index, new DoubleEntry(value));
@@ -91,11 +91,11 @@
   }
   
   /**
-   * Returns the {@link org.apache.hama.io.MapWritable}
+   * Returns the {@link org.apache.hama.io.HMapWritable}
    * 
    * @return the entries of vector
    */
-  public MapWritable<Integer, DoubleEntry> getEntries() {
+  public HMapWritable<Integer, DoubleEntry> getEntries() {
     return this.entries;
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Fri Feb 13 06:43:34 2009
@@ -46,7 +46,7 @@
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
 import org.apache.hama.mapred.CollectBlocksMapper;
@@ -96,6 +96,7 @@
     // if force is set to true:
     // 1) if this matrixName has aliase to other matrix, we will remove
     // the old aliase, create a new matrix table, and aliase to it.
+    
     // 2) if this matrixName has no aliase to other matrix, we will create
     // a new matrix table, and alise to it.
     //
@@ -340,7 +341,6 @@
 
   public Matrix add(Matrix B) throws IOException {
     Matrix result = new DenseMatrix(config);
-
     JobConf jobConf = new JobConf(config);
     jobConf.setJobName("addition MR job" + result.getPath());
 
@@ -358,8 +358,11 @@
   }
 
   public Matrix add(double alpha, Matrix B) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
+    Matrix temp = new DenseMatrix(config);
+    temp.set(alpha, B);
+    
+    Matrix result = this.add(temp);
+    return result;
   }
 
   public DenseVector getRow(int row) throws IOException {
@@ -371,7 +374,7 @@
     byte[][] c = { columnKey };
     Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
 
-    MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
+    HMapWritable<Integer, DoubleEntry> trunk = new HMapWritable<Integer, DoubleEntry>();
 
     for (RowResult row : scan) {
       trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java Fri Feb 13 06:43:34 2009
@@ -26,7 +26,7 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
 import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
@@ -34,15 +34,15 @@
   static final Logger LOG = Logger.getLogger(DenseVector.class);
 
   public DenseVector() {
-    this(new MapWritable<Integer, DoubleEntry>());
+    this(new HMapWritable<Integer, DoubleEntry>());
   }
 
-  public DenseVector(MapWritable<Integer, DoubleEntry> m) {
+  public DenseVector(HMapWritable<Integer, DoubleEntry> m) {
     this.entries = m;
   }
 
   public DenseVector(RowResult row) {
-    this.entries = new MapWritable<Integer, DoubleEntry>();
+    this.entries = new HMapWritable<Integer, DoubleEntry>();
     for (Map.Entry<byte[], Cell> f : row.entrySet()) {
       this.entries.put(BytesUtil.getColumnIndex(f.getKey()), 
           new DoubleEntry(f.getValue()));

Added: incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,189 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class HMapWritable<K, V> implements Map<Integer, V>, Writable,
+    Configurable {
+  private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+  // Static maps of code to class and vice versa. Includes types used in hama
+  // only.
+  static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+  static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+  static {
+    byte code = 0;
+    addToMap(HStoreKey.class, code++);
+    addToMap(ImmutableBytesWritable.class, code++);
+    addToMap(Text.class, code++);
+    addToMap(DoubleEntry.class, code++);
+    addToMap(byte[].class, code++);
+  }
+
+  @SuppressWarnings("boxing")
+  private static void addToMap(final Class<?> clazz, final byte code) {
+    CLASS_TO_CODE.put(clazz, code);
+    CODE_TO_CLASS.put(code, clazz);
+  }
+
+  private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+  /** @return the conf */
+  public Configuration getConf() {
+    return conf.get();
+  }
+
+  /** @param conf the conf to set */
+  public void setConf(Configuration conf) {
+    this.conf.set(conf);
+  }
+
+  /** {@inheritDoc} */
+  public void clear() {
+    instance.clear();
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsKey(Object key) {
+    return instance.containsKey(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsValue(Object value) {
+    return instance.containsValue(value);
+  }
+
+  /** {@inheritDoc} */
+  public Set<Entry<Integer, V>> entrySet() {
+    return instance.entrySet();
+  }
+
+  /** {@inheritDoc} */
+  public V get(Object key) {
+    return instance.get(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean isEmpty() {
+    return instance.isEmpty();
+  }
+
+  /** {@inheritDoc} */
+  public Set<Integer> keySet() {
+    return instance.keySet();
+  }
+
+  /** {@inheritDoc} */
+  public int size() {
+    return instance.size();
+  }
+
+  /** {@inheritDoc} */
+  public Collection<V> values() {
+    return instance.values();
+  }
+
+  // Writable
+
+  /** @return the Class class for the specified id */
+  protected Class<?> getClass(byte id) {
+    return CODE_TO_CLASS.get(id);
+  }
+
+  /** @return the id for the specified Class */
+  protected byte getId(Class<?> clazz) {
+    Byte b = CLASS_TO_CODE.get(clazz);
+    if (b == null) {
+      throw new NullPointerException("Nothing for : " + clazz);
+    }
+    return b;
+  }
+
+  @Override
+  public String toString() {
+    return this.instance.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    // Write out the number of entries in the map
+    out.writeInt(this.instance.size());
+
+    // Then write out each key/value pair
+    for (Map.Entry<Integer, V> e : instance.entrySet()) {
+      Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey()));
+      out.writeByte(getId(e.getValue().getClass()));
+      ((Writable) e.getValue()).write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+    // First clear the map. Otherwise we will just accumulate
+    // entries every time this method is called.
+    this.instance.clear();
+
+    // Read the number of entries in the map
+    int entries = in.readInt();
+
+    // Then read each key/value pair
+    for (int i = 0; i < entries; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+          .readByte()), getConf());
+      value.readFields(in);
+      V v = (V) value;
+      this.instance.put(BytesUtil.getColumnIndex(key), v);
+    }
+  }
+
+  public void putAll(Map<? extends Integer, ? extends V> m) {
+    this.instance.putAll(m);
+  }
+
+  public V remove(Object key) {
+    return this.instance.remove(key);
+  }
+
+  public V put(Integer key, V value) {
+    return this.instance.put(key, value);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java Fri Feb 13 06:43:34 2009
@@ -38,13 +38,13 @@
 public class VectorWritable implements Writable, Map<Integer, DoubleEntry> {
 
   public Integer row;
-  public MapWritable<Integer, DoubleEntry> entries;
+  public HMapWritable<Integer, DoubleEntry> entries;
 
   public VectorWritable() {
-    this(new MapWritable<Integer, DoubleEntry>());
+    this(new HMapWritable<Integer, DoubleEntry>());
   }
 
-  public VectorWritable(MapWritable<Integer, DoubleEntry> entries) {
+  public VectorWritable(HMapWritable<Integer, DoubleEntry> entries) {
     this.entries = entries;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Fri Feb 13 06:43:34 2009
@@ -39,7 +39,7 @@
 import org.apache.hama.io.BlockID;
 import org.apache.hama.io.BlockWritable;
 
-public class BlockInputFormat extends TableInputFormatBase implements
+public class BlockInputFormat extends HTableInputFormatBase implements
     InputFormat<BlockID, BlockWritable>, JobConfigurable {
   static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
   private TableRecordReader tableRecordReader;
@@ -47,7 +47,7 @@
   /**
    * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
    */
-  protected static class TableRecordReader extends TableRecordReaderBase
+  protected static class TableRecordReader extends HTableRecordReaderBase
       implements RecordReader<BlockID, BlockWritable> {
     
     /**

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,154 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public abstract class HTableInputFormatBase {
+  private static final Log LOG = LogFactory.getLog(HTableInputFormatBase.class);
+  protected byte[][] inputColumns;
+  protected HTable table;
+  protected RowFilterInterface rowFilter;
+  
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+  
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte[][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColums(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks.
+   * <ul>
+   * Splits are created in number equal to the smallest between numSplits and
+   * the number of {@link HRegion}s in the table. If the number of splits is
+   * smaller than the number of {@link HRegion}s then splits are spanned across
+   * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+   * case splits are uneven the bigger splits are placed first in the
+   * {@link InputSplit} array.
+   *
+   * @param job the map task {@link JobConf}
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+   *
+   * @return the input splits
+   *
+   * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    byte [][] startKeys = null;
+    try {
+      startKeys = this.table.getStartKeys();
+    } catch (NullPointerException e) { }
+    
+    if (startKeys == null || startKeys.length == 0) {
+      throw new IOException("Expecting at least one region");
+    }
+    if (this.table == null) {
+      throw new IOException("No table was provided");
+    }
+    if (this.inputColumns == null || this.inputColumns.length == 0) {
+      throw new IOException("Expecting at least one column");
+    }
+    int realNumSplits = numSplits > startKeys.length? startKeys.length:
+      numSplits;
+    InputSplit[] splits = new InputSplit[realNumSplits];
+    int middle = startKeys.length / realNumSplits;
+    int startPos = 0;
+    for (int i = 0; i < realNumSplits; i++) {
+      int lastPos = startPos + middle;
+      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos]).
+        getServerAddress().getHostname(); 
+      splits[i] = new TableSplit(this.table.getTableName(),
+        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+          HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
+      startPos = lastPos;
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed to the map task.
+   */
+  protected void setInputColums(byte[][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   * 
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   * 
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+
+public abstract class HTableRecordReaderBase {
+  protected byte[] startRow;
+  protected byte[] endRow;
+  protected byte [] lastRow;
+  protected RowFilterInterface trrRowFilter;
+  protected Scanner scanner;
+  protected HTable htable;
+  protected byte[][] trrInputColumns;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    if ((endRow != null) && (endRow.length > 0)) {
+      if (trrRowFilter != null) {
+        final Set<RowFilterInterface> rowFiltersSet =
+          new HashSet<RowFilterInterface>();
+        rowFiltersSet.add(new StopRowFilter(endRow));
+        rowFiltersSet.add(trrRowFilter);
+        this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+          new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+            rowFiltersSet));
+      } else {
+        this.scanner =
+          this.htable.getScanner(trrInputColumns, firstRow, endRow);
+      }
+    } else {
+      this.scanner =
+        this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
+    }
+  }
+  
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    restart(startRow);
+  }
+
+  /**
+   * @param htable the {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.htable = htable;
+  }
+
+  /**
+   * @param inputColumns the columns
+   */
+  public void setInputColumns(final byte[][] inputColumns) {
+    byte[][] columns = inputColumns;
+    this.trrInputColumns = columns;
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte[] startRow) {
+    byte[] sRow = startRow;
+    this.startRow = sRow;
+  }
+
+  /**
+   * 
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte[] endRow) {
+    byte[] eRow = endRow;
+    this.endRow = eRow;
+  }
+
+  /**
+   * @param rowFilter the {@link RowFilterInterface} to be used.
+   */
+  public void setRowFilter(RowFilterInterface rowFilter) {
+    this.trrRowFilter = rowFilter;
+  }
+
+  public void close() throws IOException {
+    this.scanner.close();
+  }
+
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return 0;
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Fri Feb 13 06:43:34 2009
@@ -39,7 +39,7 @@
 import org.apache.hama.io.VectorWritable;
 import org.apache.hama.util.BytesUtil;
 
-public class VectorInputFormat extends TableInputFormatBase implements
+public class VectorInputFormat extends HTableInputFormatBase implements
     InputFormat<IntWritable, VectorWritable>, JobConfigurable {
   static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
   private TableRecordReader tableRecordReader;
@@ -47,7 +47,7 @@
   /**
    * Iterate over an HBase table data, return (IntWritable, VectorWritable) pairs
    */
-  protected static class TableRecordReader extends TableRecordReaderBase
+  protected static class TableRecordReader extends HTableRecordReaderBase
       implements RecordReader<IntWritable, VectorWritable> {
 
    private int totalRows;

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Fri Feb 13 06:43:34 2009
@@ -168,6 +168,34 @@
     verifyMultResult(m1, m2, result);
   }
 
+  public void testSetMatrix() throws IOException {
+    Matrix a = new DenseMatrix(conf);
+    a.set(m1);
+
+    for (int i = 0; i < 5; i++) {
+      int x = RandomVariable.randInt(0, 10);
+      int y = RandomVariable.randInt(0, 10);
+      assertEquals(a.get(x, y), m1.get(x, y));
+    }
+  }
+
+  public void testSetAlphaMatrix() throws IOException {
+    Matrix a = new DenseMatrix(conf);
+    a.set(0.5, m1);
+    
+    for (int i = 0; i < 5; i++) {
+      int x = RandomVariable.randInt(0, 10);
+      int y = RandomVariable.randInt(0, 10);
+      assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
+    }
+  }
+  
+  public void testAddAlphaMatrix() throws IOException {
+    double value = m1.get(0, 0) + (m2.get(0, 0) * 0.1);
+    Matrix result = m1.add(0.1, m2);
+    assertEquals(value, result.get(0, 0));
+  }
+  
   public void testSetRow() throws IOException {
     Vector v = new DenseVector();
     double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
@@ -204,28 +232,6 @@
     }
   }
 
-  public void testSetMatrix() throws IOException {
-    Matrix a = new DenseMatrix(conf);
-    a.set(m1);
-
-    for (int i = 0; i < 5; i++) {
-      int x = RandomVariable.randInt(0, 10);
-      int y = RandomVariable.randInt(0, 10);
-      assertEquals(a.get(x, y), m1.get(x, y));
-    }
-  }
-
-  public void testSetAlphaMatrix() throws IOException {
-    Matrix a = new DenseMatrix(conf);
-    a.set(0.5, m1);
-    
-    for (int i = 0; i < 5; i++) {
-      int x = RandomVariable.randInt(0, 10);
-      int y = RandomVariable.randInt(0, 10);
-      assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
-    }
-  }
-  
   public void testLoadSave() throws IOException {
     String path1 = m1.getPath();
     // save m1 to aliase1