You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/09 06:08:20 UTC

svn commit: r792388 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/mapreduce/

Author: stack
Date: Thu Jul  9 04:08:20 2009
New Revision: 792388

URL: http://svn.apache.org/viewvc?rev=792388&view=rev
Log:
HBASE-1626 Allow emitting Deletes out of new TableReducer

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Delete.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=792388&r1=792387&r2=792388&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu Jul  9 04:08:20 2009
@@ -450,6 +450,8 @@
    HBASE-1620  Need to use special StoreScanner constructor for major compactions
                (passed sf, no caching, etc) (Jon Gray via Stack)
    HBASE-1624  Don't sort Puts if only one in list in HCM#processBatchOfRows
+   HBASE-1626  Allow emitting Deletes out of new TableReducer
+               (Lars George via Stack)
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Delete.java?rev=792388&r1=792387&r2=792388&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Delete.java Thu Jul  9 04:08:20 2009
@@ -70,7 +70,7 @@
 
   /** Constructor for Writable.  DO NOT USE */
   public Delete() {
-    this(null);
+    this((byte [])null);
   }
 
   /**
@@ -108,6 +108,16 @@
   }
 
   /**
+   * @param d Delete to clone.
+   */
+  public Delete(final Delete d) {
+    this.row = d.getRow();
+    this.ts = d.getTimeStamp();
+    this.lockId = d.getLockId();
+    this.familyMap.putAll(d.getFamilyMap());
+  }
+
+  /**
    * Method to check if the familyMap is empty
    * @return true if empty, false otherwise
    */

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java?rev=792388&r1=792387&r2=792388&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java Thu Jul  9 04:08:20 2009
@@ -20,38 +20,60 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputFormat;
 
 /**
- * Convenience class that simply writes each key, record pair to the configured 
- * HBase table.
+ * Convenience class that simply writes all values (which must be 
+ * {@link org.apache.hadoop.hbase.client.Put Put} or 
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
+ * passed to it out to the configured HBase table. This works in combination 
+ * with {@link TableOutputFormat} which actually does the writing to HBase.<p>
+ *  
+ * Keys are passed along but ignored in TableOutputFormat.  However, they can
+ * be used to control how your values will be divided up amongst the specified
+ * number of reducers. <p>
+ * 
+ * You can also use the {@link TableMapReduceUtil} class to set up the two 
+ * classes in one step:
+ * <blockquote><code>
+ * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
+ * </code></blockquote>
+ * This will also set the proper {@link TableOutputFormat} which is given the
+ * <code>table</code> parameter. The 
+ * {@link org.apache.hadoop.hbase.client.Put Put} or 
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
+ * row and columns implicitly.
  */
 public class IdentityTableReducer 
-extends TableReducer<ImmutableBytesWritable, Put> {
+extends TableReducer<Writable, Writable, Writable> {
 
   @SuppressWarnings("unused")
   private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
   
   /**
-   * Writes each given record, consisting of the key and the given values, to
-   * the HBase table.
+   * Writes each given record, consisting of the row key and the given values, 
+   * to the configured {@link OutputFormat}. It is emitting the row key and each 
+   * {@link org.apache.hadoop.hbase.client.Put Put} or 
+   * {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs. 
    * 
-   * @param key  The current row key.
-   * @param values  The values for the given row.
+   * @param key  The current row key. 
+   * @param values  The {@link org.apache.hadoop.hbase.client.Put Put} or 
+   *   {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given 
+   *   row.
    * @param context  The context of the reduce. 
    * @throws IOException When writing the record fails.
    * @throws InterruptedException When the job gets interrupted.
    */
-  public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+  @Override
+  public void reduce(Writable key, Iterable<Writable> values,
       Context context) throws IOException, InterruptedException {
-    while(values.hasNext()) {
-      context.write(key, values.next());
+    for(Writable putOrDelete : values) {
+      context.write(key, putOrDelete);
     }
   }
-  
+
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=792388&r1=792387&r2=792388&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Thu Jul  9 04:08:20 2009
@@ -24,29 +24,36 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
- * Convert Map/Reduce output and write it to an HBase table.
+ * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
+ * while the output value <u>must</u> be either a {@link Put} or a 
+ * {@link Delete} instance. 
+ * 
+ * @param <KEY>  The type of the key. Ignored in this class.
  */
-public class TableOutputFormat extends
-    FileOutputFormat<ImmutableBytesWritable, Put> {
+public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
 
   private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
   /** Job parameter that specifies the output table. */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
 
   /**
-   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 
-   * and write to an HBase table
+   * Writes the reducer output to an HBase table.
+   * 
+   * @param <KEY>  The type of the key.
    */
-  protected static class TableRecordWriter
-    extends RecordWriter<ImmutableBytesWritable, Put> {
+  protected static class TableRecordWriter<KEY> 
+  extends RecordWriter<KEY, Writable> {
     
     /** The table to write to. */
     private HTable table;
@@ -82,9 +89,11 @@
      * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
      */
     @Override
-    public void write(ImmutableBytesWritable key, Put value) 
+    public void write(KEY key, Writable value) 
     throws IOException {
-      table.put(new Put(value));
+      if (value instanceof Put) this.table.put(new Put((Put)value));
+      else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
+      else throw new IOException("Pass a Delete or a Put");
     }
   }
   
@@ -97,7 +106,7 @@
    * @throws InterruptedException When the jobs is cancelled.
    * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
    */
-  public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(
+  public RecordWriter<KEY, Writable> getRecordWriter(
     TaskAttemptContext context) 
   throws IOException, InterruptedException {
     // expecting exactly one path
@@ -111,7 +120,37 @@
       throw e;
     }
     table.setAutoFlush(false);
-    return new TableRecordWriter(table);
+    return new TableRecordWriter<KEY>(table);
+  }
+
+  /**
+   * Checks if the output target exists.
+   * 
+   * @param context  The current context.
+   * @throws IOException When the check fails. 
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    // TODO Check if the table exists?
+    
+  }
+
+  /**
+   * Returns the output committer.
+   *  
+   * @param context  The current context.
+   * @return The committer.
+   * @throws IOException When creating the committer fails.
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    return new TableOutputCommitter();
   }
   
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java?rev=792388&r1=792387&r2=792388&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java Thu Jul  9 04:08:20 2009
@@ -19,19 +19,26 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 
 /**
  * Extends the basic <code>Reducer</code> class to add the required key and
- * value output classes.
+ * value input/output classes. While the input key and value as well as the 
+ * output key can be anything handed in from the previous map phase the output 
+ * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put} 
+ * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
+ * using the {@link TableOutputFormat} class.
+ * <p>
+ * This class is extended by {@link IdentityTableReducer} but can also be 
+ * subclassed to implement similar features or any custom code needed. It has
+ * the advantage to enforce the output value to a specific basic type. 
  * 
- * @param <KEYIN>  The type of the key.
- * @param <VALUEIN>  The type of the value.
+ * @param <KEYIN>  The type of the input key.
+ * @param <VALUEIN>  The type of the input value.
+ * @param <KEYOUT>  The type of the output key.
  * @see org.apache.hadoop.mapreduce.Reducer
  */
-public abstract class TableReducer<KEYIN, VALUEIN>
-extends Reducer<KEYIN, VALUEIN, ImmutableBytesWritable, Put> {
-
+public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
+extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
 }
\ No newline at end of file