You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/09/23 07:04:13 UTC

svn commit: r698089 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/mapred/ src/test/org/apache/hadoop/hbase/mapred/

Author: stack
Date: Mon Sep 22 22:04:12 2008
New Revision: 698089

URL: http://svn.apache.org/viewvc?rev=698089&view=rev
Log:
HBASE-885 TableMap and TableReduce should be interfaces

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Sep 22 22:04:12 2008
@@ -1,6 +1,8 @@
 HBase Change Log
 Release 0.19.0 - Unreleased
   INCOMPATIBLE CHANGES
+   HBASE-885   TableMap and TableReduce should be interfaces
+               (Doğacan Güney via Stack)
 
   BUG FIXES
    HBASE-891   HRS.validateValuesLength throws IOE, gets caught in the retries

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Mon Sep 22 22:04:12 2008
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
@@ -37,7 +38,9 @@
 /**
  * Extract grouping columns from input record
  */
-public class GroupingTableMap extends TableMap<ImmutableBytesWritable,RowResult> {
+public class GroupingTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable,RowResult> {
 
   /**
    * JobConf parameter to specify the columns used to produce the key passed to 
@@ -63,7 +66,8 @@
   public static void initJob(String table, String columns, String groupColumns, 
     Class<? extends TableMap> mapper, JobConf job) {
     
-    initJob(table, columns, mapper, ImmutableBytesWritable.class, RowResult.class, job);
+    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+        ImmutableBytesWritable.class, RowResult.class, job);
     job.set(GROUP_COLUMNS, groupColumns);
   }
 
@@ -83,7 +87,6 @@
    * Pass the new key and value to reduce.
    * If any of the grouping columns are not found in the value, the record is skipped.
    */
-  @Override
   public void map(@SuppressWarnings("unused") ImmutableBytesWritable key,
       RowResult value, OutputCollector<ImmutableBytesWritable,RowResult> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Mon Sep 22 22:04:12 2008
@@ -24,13 +24,16 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
  * Pass the given key and record as-is to reduce
  */
-public class IdentityTableMap extends TableMap<ImmutableBytesWritable, RowResult> {
+public class IdentityTableMap
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable, RowResult> {
 
   /** constructor */
   public IdentityTableMap() {
@@ -49,14 +52,14 @@
   @SuppressWarnings("unchecked")
   public static void initJob(String table, String columns,
     Class<? extends TableMap> mapper, JobConf job) {
-    TableMap.initJob(table, columns, mapper, ImmutableBytesWritable.class,
+    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
+      ImmutableBytesWritable.class,
       RowResult.class, job);
   }
 
   /**
    * Pass the key, value to reduce
    */
-  @Override
   public void map(ImmutableBytesWritable key, RowResult value,
       OutputCollector<ImmutableBytesWritable,RowResult> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Mon Sep 22 22:04:12 2008
@@ -25,13 +25,16 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
  * Write to table each key, record pair
  */
-public class IdentityTableReduce extends TableReduce<ImmutableBytesWritable, BatchUpdate> {
+public class IdentityTableReduce
+extends MapReduceBase
+implements TableReduce<ImmutableBytesWritable, BatchUpdate> {
   @SuppressWarnings("unused")
   private static final Log LOG =
     LogFactory.getLog(IdentityTableReduce.class.getName());
@@ -41,7 +44,6 @@
    *
    * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
    */
-  @Override
   public void reduce(ImmutableBytesWritable key, Iterator<BatchUpdate> values,
       OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
       @SuppressWarnings("unused") Reporter reporter)

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java Mon Sep 22 22:04:12 2008
@@ -32,6 +32,7 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -43,7 +44,9 @@
  * Map outputs table rows IF the input row has columns that have content.  
  * Uses an {@link IdentityReducer}
  */
-public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> implements Tool {
+public class RowCounter
+extends MapReduceBase
+implements TableMap<ImmutableBytesWritable, RowResult>, Tool {
   /* Name of this 'program'
    */
   static final String NAME = "rowcounter";
@@ -53,7 +56,6 @@
         new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>());
   private static enum Counters {ROWS}
   
-  @Override
   public void map(ImmutableBytesWritable row, RowResult value,
     OutputCollector<ImmutableBytesWritable, RowResult> output,
     @SuppressWarnings("unused") Reporter reporter)
@@ -93,7 +95,7 @@
       sb.append(args[i]);
     }
     // Second argument is the table name.
-    TableMap.initJob(args[1], sb.toString(), this.getClass(),
+    TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), this.getClass(),
       ImmutableBytesWritable.class, RowResult.class, c);
     c.setReducerClass(IdentityReducer.class);
     // First arg is the output directory.

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Mon Sep 22 22:04:12 2008
@@ -19,18 +19,11 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
-import java.io.IOException;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
 
 /**
  * Scan an HBase table to sort by a specified sort column.
@@ -40,42 +33,7 @@
  * @param <V> Writable value class
  */
 @SuppressWarnings("unchecked")
-public abstract class TableMap<K extends WritableComparable, V extends Writable>
-    extends MapReduceBase implements Mapper<ImmutableBytesWritable, RowResult, K, V> {
-  /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
-   * 
-   * @param table table name
-   * @param columns columns to scan
-   * @param mapper mapper class
-   * @param outputKeyClass
-   * @param outputValueClass
-   * @param job job configuration
-   */
-  public static void initJob(String table, String columns,
-    Class<? extends TableMap> mapper, 
-    Class<? extends WritableComparable> outputKeyClass, 
-    Class<? extends Writable> outputValueClass, JobConf job) {
-      
-    job.setInputFormat(TableInputFormat.class);
-    job.setMapOutputValueClass(outputValueClass);
-    job.setMapOutputKeyClass(outputKeyClass);
-    job.setMapperClass(mapper);
-    FileInputFormat.addInputPaths(job, table);
-    job.set(TableInputFormat.COLUMN_LIST, columns);
-  }
+public interface TableMap<K extends WritableComparable, V extends Writable>
+extends Mapper<ImmutableBytesWritable, RowResult, K, V> {
 
-  /**
-   * Call a user defined function on a single HBase record, represented
-   * by a key and its associated record value.
-   * 
-   * @param key
-   * @param value
-   * @param output
-   * @param reporter
-   * @throws IOException
-   */
-  public abstract void map(ImmutableBytesWritable key, RowResult value,
-      OutputCollector<K, V> output, Reporter reporter) throws IOException;
 }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java?rev=698089&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java Mon Sep 22 22:04:12 2008
@@ -0,0 +1,52 @@
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("unchecked")
+public class TableMapReduceUtil {
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table table name
+   * @param columns columns to scan
+   * @param mapper mapper class
+   * @param outputKeyClass
+   * @param outputValueClass
+   * @param job job configuration
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper, 
+    Class<? extends WritableComparable> outputKeyClass, 
+    Class<? extends Writable> outputValueClass, JobConf job) {
+      
+    job.setInputFormat(TableInputFormat.class);
+    job.setMapOutputValueClass(outputValueClass);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapperClass(mapper);
+    FileInputFormat.addInputPaths(job, table);
+    job.set(TableInputFormat.COLUMN_LIST, columns);
+  }
+  
+  /**
+   * Use this before submitting a TableReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initTableReduceJob(String table,
+      Class<? extends TableReduce> reducer, JobConf job) {
+    job.setOutputFormat(TableOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(TableOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Mon Sep 22 22:04:12 2008
@@ -19,18 +19,11 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
 
 /**
  * Write a table, sorting by the input key
@@ -39,34 +32,7 @@
  * @param <V> value class
  */
 @SuppressWarnings("unchecked")
-public abstract class TableReduce<K extends WritableComparable, V extends Writable>
-    extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
-  /**
-   * Use this before submitting a TableReduce job. It will
-   * appropriately set up the JobConf.
-   * 
-   * @param table
-   * @param reducer
-   * @param job
-   */
-  public static void initJob(String table,
-      Class<? extends TableReduce> reducer, JobConf job) {
-    job.setOutputFormat(TableOutputFormat.class);
-    job.setReducerClass(reducer);
-    job.set(TableOutputFormat.OUTPUT_TABLE, table);
-    job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(BatchUpdate.class);
-  }
+public interface TableReduce<K extends WritableComparable, V extends Writable>
+extends Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
 
-  /**
-   * 
-   * @param key
-   * @param values
-   * @param output
-   * @param reporter
-   * @throws IOException
-   */
-  public abstract void reduce(K key, Iterator<V> values,
-    OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
-  throws IOException;
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=698089&r1=698088&r2=698089&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Mon Sep 22 22:04:12 2008
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
@@ -73,12 +74,13 @@
   /**
    * Pass the given key and processed record reduce
    */
-  public static class ProcessContentsMapper extends TableMap<ImmutableBytesWritable, BatchUpdate> {
+  public static class ProcessContentsMapper
+  extends MapReduceBase
+  implements TableMap<ImmutableBytesWritable, BatchUpdate> {
     /**
      * Pass the key, and reversed value to reduce
      */
     @SuppressWarnings("unchecked")
-    @Override
     public void map(ImmutableBytesWritable key, RowResult value,
       OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
       @SuppressWarnings("unused") Reporter reporter) 
@@ -127,10 +129,10 @@
       jobConf = new JobConf(conf, TestTableMapReduce.class);
       jobConf.setJobName("process column contents");
       jobConf.setNumReduceTasks(1);
-      TableMap.initJob(Bytes.toString(table.getTableName()), INPUT_COLUMN, 
-        ProcessContentsMapper.class, ImmutableBytesWritable.class,
-        BatchUpdate.class, jobConf);
-      TableReduce.initJob(Bytes.toString(table.getTableName()),
+      TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
+        INPUT_COLUMN, ProcessContentsMapper.class,
+        ImmutableBytesWritable.class, BatchUpdate.class, jobConf);
+      TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
         IdentityTableReduce.class, jobConf);
             
       LOG.info("Started " + table.getTableName());