You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/11/06 07:36:15 UTC

svn commit: r711778 - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/mapred/ test/org/apache/hama/mapred/

Author: edwardyoon
Date: Wed Nov  5 22:36:14 2008
New Revision: 711778

URL: http://svn.apache.org/viewvc?rev=711778&view=rev
Log:
Renaming.

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Nov  5 22:36:14 2008
@@ -38,7 +38,7 @@
 import org.apache.hama.io.VectorMapWritable;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
 import org.apache.hama.util.JobManager;
 import org.apache.hama.util.Numeric;
 import org.apache.hama.util.RandomVariable;
@@ -255,7 +255,7 @@
 
     Add1DLayoutMap.initJob(this.getPath(), B.getPath(), Add1DLayoutMap.class,
         IntWritable.class, VectorWritable.class, jobConf);
-    MatrixReduce.initJob(result.getPath(), Add1DLayoutReduce.class, jobConf);
+    RowCyclicReduce.initJob(result.getPath(), Add1DLayoutReduce.class, jobConf);
 
     JobManager.execute(jobConf, result);
     return result;
@@ -305,7 +305,7 @@
 
     Mult1DLayoutMap.initJob(this.getPath(), B.getPath(), Mult1DLayoutMap.class,
         IntWritable.class, VectorWritable.class, jobConf);
-    MatrixReduce.initJob(result.getPath(), Mult1DLayoutReduce.class, jobConf);
+    RowCyclicReduce.initJob(result.getPath(), Mult1DLayoutReduce.class, jobConf);
     JobManager.execute(jobConf, result);
     return result;
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java Wed Nov  5 22:36:14 2008
@@ -31,10 +31,10 @@
 import org.apache.hama.Matrix;
 import org.apache.hama.Vector;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.DenseMap;
+import org.apache.hama.mapred.RowCyclicMap;
 import org.apache.log4j.Logger;
 
-public class Add1DLayoutMap extends DenseMap<IntWritable, VectorWritable> {
+public class Add1DLayoutMap extends RowCyclicMap<IntWritable, VectorWritable> {
   static final Logger LOG = Logger.getLogger(Add1DLayoutMap.class);
   protected Matrix matrix_b;
   public static final String MATRIX_B = "hama.addition.matrix.b";

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java Wed Nov  5 22:36:14 2008
@@ -29,9 +29,9 @@
 import org.apache.hama.io.VectorEntry;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
 
-public class Add1DLayoutReduce extends MatrixReduce<IntWritable, VectorWritable> {
+public class Add1DLayoutReduce extends RowCyclicReduce<IntWritable, VectorWritable> {
 
   @Override
   public void reduce(IntWritable key, Iterator<VectorWritable> values,

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java Wed Nov  5 22:36:14 2008
@@ -31,13 +31,13 @@
 import org.apache.hama.Matrix;
 import org.apache.hama.Vector;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.DenseMap;
+import org.apache.hama.mapred.RowCyclicMap;
 import org.apache.log4j.Logger;
 
 /**
  * 1D Block Layout version
  */
-public class Mult1DLayoutMap extends DenseMap<IntWritable, VectorWritable> {
+public class Mult1DLayoutMap extends RowCyclicMap<IntWritable, VectorWritable> {
   static final Logger LOG = Logger.getLogger(Mult1DLayoutMap.class);
   protected Matrix matrix_b;
   public static final String MATRIX_B = "hama.multiplication.matrix.b";

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java Wed Nov  5 22:36:14 2008
@@ -29,11 +29,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.io.VectorUpdate;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
 import org.apache.log4j.Logger;
 
 public class Mult1DLayoutReduce extends
-    MatrixReduce<IntWritable, VectorWritable> {
+    RowCyclicReduce<IntWritable, VectorWritable> {
   static final Logger LOG = Logger.getLogger(Mult1DLayoutReduce.class);
   public static final Map<Integer, Double> buffer = new HashMap<Integer, Double>();
   

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java Wed Nov  5 22:36:14 2008
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
+import org.apache.hama.io.VectorWritable;
+
+@SuppressWarnings("unchecked")
+public abstract class RowCyclicMap<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements
+    Mapper<IntWritable, VectorWritable, K, V> {
+  public static Matrix MATRIX_B;
+
+  public static void initJob(String matrixA, 
+      Class<? extends RowCyclicMap> mapper,
+      JobConf job) {
+
+    job.setInputFormat(VectorInputFormat.class);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, matrixA);
+
+    job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  public abstract void map(IntWritable key, VectorWritable value,
+      OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java Wed Nov  5 22:36:14 2008
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+
+@SuppressWarnings("unchecked")
+public abstract class RowCyclicReduce<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Reducer<K, V, IntWritable, VectorUpdate> {
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<? extends RowCyclicReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  /**
+   * 
+   * @param key
+   * @param values
+   * @param output
+   * @param reporter
+   * @throws IOException
+   */
+  public abstract void reduce(K key, Iterator<V> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Wed Nov  5 22:36:14 2008
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class VectorInputFormat extends VectorInputFormatBase implements
+    JobConfigurable {
+  private final Log LOG = LogFactory.getLog(VectorInputFormat.class);
+
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+  /** {@inheritDoc} */
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte[][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColums(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java Wed Nov  5 22:36:14 2008
@@ -0,0 +1,265 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.util.Numeric;
+
+public abstract class VectorInputFormatBase implements
+    InputFormat<IntWritable, VectorWritable> {
+  private byte[][] inputColumns;
+  private HTable table;
+  private TableRecordReader tableRecordReader;
+  private RowFilterInterface rowFilter;
+
+  /**
+   * Iterate over an HBase table data, return (Text, VectorWritable) pairs
+   */
+  protected static class TableRecordReader implements
+      RecordReader<IntWritable, VectorWritable> {
+    private byte[] startRow;
+    private byte[] endRow;
+    private RowFilterInterface trrRowFilter;
+    private Scanner scanner;
+    private HTable htable;
+    private byte[][] trrInputColumns;
+
+    /**
+     * Build the scanner. Not done in constructor to allow for extension.
+     * 
+     * @throws IOException
+     */
+    public void init() throws IOException {
+      if ((endRow != null) && (endRow.length > 0)) {
+        if (trrRowFilter != null) {
+          final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+          rowFiltersSet.add(new StopRowFilter(endRow));
+          rowFiltersSet.add(trrRowFilter);
+          this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+              new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+                  rowFiltersSet));
+        } else {
+          this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+              endRow);
+        }
+      } else {
+        this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+            trrRowFilter);
+      }
+    }
+
+    /**
+     * @param htable the {@link HTable} to scan.
+     */
+    public void setHTable(HTable htable) {
+      this.htable = htable;
+    }
+
+    /**
+     * @param inputColumns the columns to be placed in {@link VectorWritable}.
+     */
+    public void setInputColumns(final byte[][] inputColumns) {
+      byte[][] columns = inputColumns;
+      this.trrInputColumns = columns;
+    }
+
+    /**
+     * @param startRow the first row in the split
+     */
+    public void setStartRow(final byte[] startRow) {
+      byte[] sRow = startRow;
+      this.startRow = sRow;
+    }
+
+    /**
+     * 
+     * @param endRow the last row in the split
+     */
+    public void setEndRow(final byte[] endRow) {
+      byte[] eRow = endRow;
+      this.endRow = eRow;
+    }
+
+    /**
+     * @param rowFilter the {@link RowFilterInterface} to be used.
+     */
+    public void setRowFilter(RowFilterInterface rowFilter) {
+      this.trrRowFilter = rowFilter;
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      this.scanner.close();
+    }
+
+    /**
+     * @return IntWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public IntWritable createKey() {
+      return new IntWritable();
+    }
+
+    /**
+     * @return VectorWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public VectorWritable createValue() {
+      return new VectorWritable();
+    }
+
+    /** {@inheritDoc} */
+    public long getPos() {
+      // This should be the ordinal tuple in the range;
+      // not clear how to calculate...
+      return 0;
+    }
+
+    /** {@inheritDoc} */
+    public float getProgress() {
+      // Depends on the total number of tuples and getPos
+      return 0;
+    }
+
+    /**
+     * @param key IntWritable as input key.
+     * @param value VectorWritable as input value
+     * 
+     * Converts Scanner.next() to IntWritable, VectorWritable
+     * 
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(IntWritable key, VectorWritable value) throws IOException {
+      RowResult result = this.scanner.next();
+      boolean hasMore = result != null && result.size() > 0;
+      if (hasMore) {
+        key.set(Numeric.bytesToInt(result.getRow()));
+        Writables.copyWritable(result, value);
+      }
+      return hasMore;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   * 
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<IntWritable, VectorWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+    if (Numeric.bytesToInt(meta.getValue()) < numSplits) {
+      numSplits = Numeric.bytesToInt(meta.getValue());
+    }
+
+    int[] startKeys = new int[numSplits];
+    int interval = Numeric.bytesToInt(meta.getValue()) / numSplits;
+
+    for (int i = 0; i < numSplits; i++) {
+      startKeys[i] = (i * interval);
+    }
+
+    InputSplit[] splits = new InputSplit[startKeys.length];
+    for (int i = 0; i < startKeys.length; i++) {
+      splits[i] = new TableSplit(this.table.getTableName(), 
+          Numeric.intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ? 
+              Numeric.intToBytes(startKeys[i + 1]) : HConstants.EMPTY_START_ROW);
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed in {@link VectorWritable} to the map task.
+   */
+  protected void setInputColums(byte[][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   * 
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   * 
+   * @param tableRecordReader to provide other {@link TableRecordReader}
+   *                implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   * 
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java Wed Nov  5 22:36:14 2008
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hama.io.VectorUpdate;
+
+public class VectorOutputFormat extends
+    FileOutputFormat<IntWritable, VectorUpdate> {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hama.mapred.output";
+  private final static Log LOG = LogFactory.getLog(VectorOutputFormat.class);
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+   * and write to an HBase table
+   */
+  protected static class TableRecordWriter implements
+      RecordWriter<IntWritable, VectorUpdate> {
+    private HTable m_table;
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     * 
+     * @param table
+     */
+    public TableRecordWriter(HTable table) {
+      m_table = table;
+    }
+
+    /** {@inheritDoc} */
+    public void close(Reporter reporter) {
+    }
+
+    /** {@inheritDoc} */
+    public void write(IntWritable key, VectorUpdate value) throws IOException {
+      m_table.commit(value.getBatchUpdate());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+      String name, Progressable progress) throws IOException {
+
+    // expecting exactly one path
+
+    String tableName = job.get(OUTPUT_TABLE);
+    HTable table = null;
+    try {
+      table = new HTable(new HBaseConfiguration(job), tableName);
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    }
+    return new TableRecordWriter(table);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+      throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+    String tableName = job.get(OUTPUT_TABLE);
+    if (tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Wed Nov  5 22:36:14 2008
@@ -71,7 +71,7 @@
 
     Add1DLayoutMap.initJob(pathA, pathB, Add1DLayoutMap.class, IntWritable.class,
         VectorWritable.class, jobConf);
-    MatrixReduce.initJob(output, Add1DLayoutReduce.class, jobConf);
+    RowCyclicReduce.initJob(output, Add1DLayoutReduce.class, jobConf);
 
     jobConf.setNumMapTasks(2);
     jobConf.setNumReduceTasks(2);