You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/08/26 07:42:12 UTC

svn commit: r688966 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/mapred/

Author: edwardyoon
Date: Mon Aug 25 22:42:11 2008
New Revision: 688966

URL: http://svn.apache.org/viewvc?rev=688966&view=rev
Log:
Replace ImmutableBytesWritable to IntWritable.

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Aug 25 22:42:11 2008
@@ -19,6 +19,7 @@
     
   IMPROVEMENTS
   
+    HAMA-42: Replace ImmutableBytesWritable to IntWritable (edwardyoon)
     HAMA-44: Remove findbugs warnings (edwardyoon)
     HAMA-40: Rename MatrixInterface to Matrix (edwardyoon)
     HAMA-14: Using Java 6 (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java Mon Aug 25 22:42:11 2008
@@ -21,22 +21,21 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.DenseVector;
 import org.apache.hama.Vector;
 import org.apache.hama.mapred.DenseMap;
-import org.apache.hama.util.Numeric;
 
-public class AdditionMap extends DenseMap<ImmutableBytesWritable, DenseVector> {
+public class AdditionMap extends DenseMap<IntWritable, DenseVector> {
 
   @Override
-  public void map(ImmutableBytesWritable key, DenseVector value,
-      OutputCollector<ImmutableBytesWritable, DenseVector> output,
+  public void map(IntWritable key, DenseVector value,
+      OutputCollector<IntWritable, DenseVector> output,
       Reporter reporter) throws IOException {
 
-    Vector v1 = MATRIX_B.getRow(Numeric.bytesToInt(key.get()));
+    Vector v1 = MATRIX_B.getRow(key.get());
     output.collect(key, (DenseVector) v1.add(value));
 
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java Mon Aug 25 22:42:11 2008
@@ -25,21 +25,21 @@
 
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.DenseVector;
 import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.util.Numeric;
 
-public class AdditionReduce extends
-    MatrixReduce<ImmutableBytesWritable, DenseVector> {
+public class AdditionReduce extends MatrixReduce<IntWritable, DenseVector> {
 
   @Override
-  public void reduce(ImmutableBytesWritable key, Iterator<DenseVector> values,
-      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
-      Reporter reporter) throws IOException {
+  public void reduce(IntWritable key, Iterator<DenseVector> values,
+      OutputCollector<IntWritable, BatchUpdate> output, Reporter reporter)
+      throws IOException {
 
-    BatchUpdate b = new BatchUpdate(key.get());
+    BatchUpdate b = new BatchUpdate(Numeric.intToBytes(key.get()));
     DenseVector vector = values.next();
     for (Map.Entry<byte[], Cell> f : vector.entrySet()) {
       b.put(f.getKey(), f.getValue().getValue());

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java Mon Aug 25 22:42:11 2008
@@ -21,7 +21,7 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -39,7 +39,7 @@
 @SuppressWarnings("unchecked")
 public abstract class DenseMap<K extends WritableComparable, V extends Writable>
     extends MapReduceBase implements
-    Mapper<ImmutableBytesWritable, DenseVector, K, V> {
+    Mapper<IntWritable, DenseVector, K, V> {
   protected static Matrix MATRIX_B;
 
   public static void initJob(String matrixA, String matrixB,
@@ -57,6 +57,6 @@
     job.set(MatrixInputFormat.COLUMN_LIST, Constants.COLUMN);
   }
 
-  public abstract void map(ImmutableBytesWritable key, DenseVector value,
+  public abstract void map(IntWritable key, DenseVector value,
       OutputCollector<K, V> output, Reporter reporter) throws IOException;
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java Mon Aug 25 22:42:11 2008
@@ -31,20 +31,21 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterSet;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hama.DenseVector;
+import org.apache.hama.util.Numeric;
 
 public abstract class MatrixInputFormatBase implements
-    InputFormat<ImmutableBytesWritable, DenseVector> {
+    InputFormat<IntWritable, DenseVector> {
   private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class);
   private byte[][] inputColumns;
   private HTable table;
@@ -55,7 +56,7 @@
    * Iterate over an HBase table data, return (Text, DenseVector) pairs
    */
   protected static class TableRecordReader implements
-      RecordReader<ImmutableBytesWritable, DenseVector> {
+      RecordReader<IntWritable, DenseVector> {
     private byte[] startRow;
     private byte[] endRow;
     private RowFilterInterface trrRowFilter;
@@ -136,8 +137,8 @@
      * 
      * @see org.apache.hadoop.mapred.RecordReader#createKey()
      */
-    public ImmutableBytesWritable createKey() {
-      return new ImmutableBytesWritable();
+    public IntWritable createKey() {
+      return new IntWritable();
     }
 
     /**
@@ -172,12 +173,12 @@
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public boolean next(ImmutableBytesWritable key, DenseVector value)
+    public boolean next(IntWritable key, DenseVector value)
         throws IOException {
       RowResult result = this.scanner.next();
       boolean hasMore = result != null && result.size() > 0;
       if (hasMore) {
-        key.set(result.getRow());
+        key.set(Numeric.bytesToInt(result.getRow()));
         Writables.copyWritable(result, value);
       }
       return hasMore;
@@ -191,7 +192,7 @@
    * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
    *      JobConf, Reporter)
    */
-  public RecordReader<ImmutableBytesWritable, DenseVector> getRecordReader(
+  public RecordReader<IntWritable, DenseVector> getRecordReader(
       InputSplit split, @SuppressWarnings("unused") JobConf job,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     TableSplit tSplit = (TableSplit) split;

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java?rev=688966&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java Mon Aug 25 22:42:11 2008
@@ -0,0 +1,90 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+public class MatrixOutputFormat extends
+    FileOutputFormat<IntWritable, BatchUpdate> {
+
+  /** JobConf parameter that specifies the output table */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+  private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+  /**
+   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+   * and write to an HBase table
+   */
+  protected static class TableRecordWriter implements
+      RecordWriter<IntWritable, BatchUpdate> {
+    private HTable m_table;
+
+    /**
+     * Instantiate a TableRecordWriter with the HBase HClient for writing.
+     * 
+     * @param table
+     */
+    public TableRecordWriter(HTable table) {
+      m_table = table;
+    }
+
+    /** {@inheritDoc} */
+    public void close(@SuppressWarnings("unused")
+    Reporter reporter) {
+      // Nothing to do.
+    }
+
+    /** {@inheritDoc} */
+    public void write(@SuppressWarnings("unused")
+    IntWritable key, BatchUpdate value) throws IOException {
+      m_table.commit(value);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(@SuppressWarnings("unused")
+  FileSystem ignored, JobConf job, @SuppressWarnings("unused")
+  String name, @SuppressWarnings("unused")
+  Progressable progress) throws IOException {
+
+    // expecting exactly one path
+
+    String tableName = job.get(OUTPUT_TABLE);
+    HTable table = null;
+    try {
+      table = new HTable(new HBaseConfiguration(job), tableName);
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    }
+    return new TableRecordWriter(table);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unused")
+  public void checkOutputSpecs(FileSystem ignored, JobConf job)
+      throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+    String tableName = job.get(OUTPUT_TABLE);
+    if (tableName == null) {
+      throw new IOException("Must specify table name");
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java Mon Aug 25 22:42:11 2008
@@ -23,8 +23,7 @@
 import java.util.Iterator;
 
 import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -35,7 +34,7 @@
 
 @SuppressWarnings("unchecked")
 public abstract class MatrixReduce<K extends WritableComparable, V extends Writable>
-    extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
+    extends MapReduceBase implements Reducer<K, V, IntWritable, BatchUpdate> {
   /**
    * Use this before submitting a TableReduce job. It will
    * appropriately set up the JobConf.
@@ -46,10 +45,10 @@
    */
   public static void initJob(String table,
       Class<? extends MatrixReduce> reducer, JobConf job) {
-    job.setOutputFormat(TableOutputFormat.class);
+    job.setOutputFormat(MatrixOutputFormat.class);
     job.setReducerClass(reducer);
-    job.set(TableOutputFormat.OUTPUT_TABLE, table);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.set(MatrixOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(BatchUpdate.class);
   }
 
@@ -62,6 +61,6 @@
    * @throws IOException
    */
   public abstract void reduce(K key, Iterator<V> values,
-    OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
+    OutputCollector<IntWritable, BatchUpdate> output, Reporter reporter)
   throws IOException;
 }
\ No newline at end of file

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Mon Aug 25 22:42:11 2008
@@ -21,7 +21,7 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hama.DenseMatrix;
@@ -64,7 +64,7 @@
     JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
     jobConf.setJobName("test MR job");
 
-    DenseMap.initJob(A, B, AdditionMap.class, ImmutableBytesWritable.class,
+    DenseMap.initJob(A, B, AdditionMap.class, IntWritable.class,
         DenseVector.class, jobConf);
     MatrixReduce.initJob(output, AdditionReduce.class, jobConf);
 
@@ -76,5 +76,4 @@
     assertEquals(c.get(0, 0), 2.0);
     assertEquals(c.get(0, 1), 1.0);
   }
-
 }