You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:04 UTC

svn commit: r1181568 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: nspiegelberg
Date: Tue Oct 11 02:21:01 2011
New Revision: 1181568

URL: http://svn.apache.org/viewvc?rev=1181568&view=rev
Log:
Adding RowMutation for being able to do both Delete and Put in MR jobs

Summary:
I have added a union-style class called RowMutation to encapsulate either a
Delete or a Put in MapReduce jobs.
This is going to be useful in gathering up mutations for batch import jobs that
also require deletes to be made available.

I have also added a matching RowMutationSortReducer modified from the initial
PutSortReducer so as to know how to sort these elements in a MR job.

Finally, I added a line for selecting the proper reducer in the
HFileOutputFormatter, due to the rather hacky nature of checking for instances
of input...

Test Plan:
I've implemented a small unit test which creates a MR job with a mapper that
statically outputs a Put and a Delete. The new reducer properly sorts them and
outputs them and the test wil dump to a file using HFileOutputFormat.

Reviewed By: kannan
Reviewers: kannan, jfan, nspiegelberg
Commenters: nspiegelberg, gqchen
CC: kannan, nspiegelberg, gqchen, bogdan
Differential Revision: 267252

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java Tue Oct 11 02:21:01 2011
@@ -19,10 +19,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * Has a row.
  */
-interface Row {
+public interface Row extends WritableComparable<Row> {
   /**
    * @return The row.
    */

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java?rev=1181568&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java Tue Oct 11 02:21:01 2011
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Class to be used in Bulk Import MR jobs to be able to pass both Deletes and Puts in as parameters.
+ */
+public class RowMutation implements Row {
+
+  /**
+   * Enum used for serializing
+   */
+  public static enum Type {
+    Put((byte)4),
+    Delete((byte)8);
+
+    private final byte code;
+
+    Type(final byte c) {
+      this.code = c;
+    }
+
+    /**
+     * @return the code of this union
+     */
+    public byte getCode() {
+      return this.code;
+    }
+  }
+
+  /**
+   * Union type of field to hold either a Put or Delete
+   * Useful for abstractions
+   */
+  private Row row = null;
+
+  /**
+   * To be used for Writable.
+   * DO NOT USE!!!
+   */
+  public RowMutation() {}
+
+  /**
+   * Copy constructor
+   * @param r the item to copy
+   * @throws IOException if passed parameter is not of required type
+   */
+  public RowMutation(final RowMutation r)
+  throws IOException {
+    if (null == r) {
+      throw new IOException("Cannot pass a null object to constructor");
+    }
+
+    row = r.getInstance();
+  }
+
+  /**
+   * Constructor to set the inner union style field.
+   * @param request -- the Put or Delete to be executed
+   * @throws IOException if passed parameter is not of required type
+   */
+  public RowMutation(final WritableComparable<Row> request)
+  throws IOException {
+    if(request instanceof Put) {
+      row = new Put((Put)request);
+    } else if(request instanceof Delete) {
+      row = new Delete((Delete)request);
+    } else {
+      throw new IOException("Must pass either a Delete or a Put");
+    }
+  }
+
+  /**
+   * Method for getting the Row instance from inside
+   * @return row
+   */
+  public Row getInstance() {
+    return row;
+  }
+
+  @Override
+  public int compareTo(Row o) {
+    return row.compareTo(o);
+  }
+
+  @Override
+  public byte[] getRow() {
+    return row.getRow();
+  }
+
+  @Override
+  public void readFields(DataInput in)
+  throws IOException {
+    byte b = in.readByte();
+
+    if(Type.Put.getCode() == b) {
+      row = new Put();
+    } else if(Type.Delete.getCode() == b) {
+      row = new Delete();
+    } else {
+      throw new IOException("Tried to read an invalid type of serialized object!");
+    }
+
+    row.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out)
+  throws IOException {
+    byte b = 0;
+
+    if(row instanceof Put) {
+      b = Type.Put.getCode();
+    } else if(row instanceof Delete) {
+      b = Type.Delete.getCode();
+    } else {
+      throw new IOException("Tried to write an invalid type of object to serialize!");
+    }
+
+    out.write(b);
+    row.write(out);
+  }
+
+}
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:21:01 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -307,6 +308,8 @@ public class HFileOutputFormat extends F
       job.setReducerClass(KeyValueSortReducer.class);
     } else if (Put.class.equals(job.getMapOutputValueClass())) {
       job.setReducerClass(PutSortReducer.class);
+    } else if (RowMutation.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(RowMutationSortReducer.class);
     } else {
       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
     }

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java?rev=1181568&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java Tue Oct 11 02:21:01 2011
@@ -0,0 +1,99 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Emits sorted Puts and Deletes.
+ * Reads in all Puts and Deletes from passed Iterator (over RowMutation elements), sorts them, then emits
+ * KeyValue items in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ */
+public class RowMutationSortReducer extends
+    Reducer<ImmutableBytesWritable, RowMutation, ImmutableBytesWritable, KeyValue> {
+
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable row,
+      java.lang.Iterable<RowMutation> requests,
+      Reducer<ImmutableBytesWritable, RowMutation,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "rowmutationsortreducer.row.threshold", 2L * (1<<30));
+    Iterator<RowMutation> iter = requests.iterator();
+    while (iter.hasNext()) {
+      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      while (iter.hasNext() && curSize < threshold) {
+        Row r = iter.next().getInstance();
+        Map< byte[], List<KeyValue> > familyMap;
+        if (r instanceof Put) {
+          familyMap = ((Put) r).getFamilyMap();
+        } else if (r instanceof Delete) {
+          familyMap = ((Delete) r).getFamilyMap();
+        } else {
+          familyMap = null;
+        }
+
+        if (null != familyMap) {
+          for (List<KeyValue> kvs : familyMap.values()) {
+            for (KeyValue kv : kvs) {
+              map.add(kv);
+              curSize += kv.getValueLength();
+            }
+          }
+        }
+      }
+      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : map) {
+        context.write(row, kv);
+        if (index > 0 && index % 100 == 0)
+          context.setStatus("Wrote " + index);
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:21:01 2011
@@ -46,11 +46,17 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.RowMutation;
+
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
@@ -124,7 +130,6 @@ public class TestHFileOutputFormat  {
                ImmutableBytesWritable,KeyValue>.Context context)
         throws java.io.IOException ,InterruptedException
     {
-
       byte keyBytes[] = new byte[keyLength];
       byte valBytes[] = new byte[valLength];
 
@@ -154,7 +159,6 @@ public class TestHFileOutputFormat  {
     util.cleanupTestDir();
   }
 
-
   private void setupRandomGeneratorMapper(Job job) {
     job.setInputFormatClass(NMapInputFormat.class);
     job.setMapperClass(RandomKVGeneratingMapper.class);
@@ -162,6 +166,104 @@ public class TestHFileOutputFormat  {
     job.setMapOutputValueClass(KeyValue.class);
   }
 
+  static class RowSorterMapper
+  extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, RowMutation> {
+    @Override
+    protected void map(
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+               ImmutableBytesWritable, RowMutation>.Context context)
+    throws IOException ,InterruptedException
+    {
+      byte[] row = Bytes.toBytes("row1");
+
+      // need one for every task...
+      byte[] key = Bytes.toBytes("key");
+
+      byte[] col1 = Bytes.toBytes("col1");
+      byte[] col2 = Bytes.toBytes("col2");
+      byte[] col3 = Bytes.toBytes("col3");
+
+      // PUT cf=info-A
+      Row put1 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[0], col1, 10, Bytes.toBytes("val10"));
+      Row put2 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[0], col2, 11, Bytes.toBytes("val11"));
+
+      // PUT cf=info-B
+      Row put3 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[1], col1, 20, Bytes.toBytes("val20"));
+      Row put4 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[1], col2, 21, Bytes.toBytes("val21"));
+
+      // PUT new column
+      Row put5 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[1], col3, 30, Bytes.toBytes("val30"));
+      Row put6 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[1], col3, 31, Bytes.toBytes("val31"));
+      Row put7 = new Put(row).add(
+          TestHFileOutputFormat.FAMILIES[1], col3, 32, Bytes.toBytes("val32"));
+
+      // DELETEs
+      Row del1 = new Delete(row).deleteColumn(
+          TestHFileOutputFormat.FAMILIES[1], col2, 21);
+
+      Row del2 = new Delete(row).deleteFamily(
+          TestHFileOutputFormat.FAMILIES[0]);
+
+      Row del3 = new Delete(row).deleteColumns(
+          TestHFileOutputFormat.FAMILIES[1], col3);
+
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put1));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put2));
+
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put3));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put4));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put5));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put6));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put7));
+
+      context.write(new ImmutableBytesWritable(key), new RowMutation(del1));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(del2));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(del3));
+    }
+  }
+
+  /**
+   * Test for the union style MR jobs that runs both Put and Delete requests
+   * @throws Exception on job, sorting, IO or fs errors
+   */
+  @Test
+  public void testRowSortReducer()
+  throws Exception {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    conf.setInt("io.sort.mb", 20);
+
+    Path dir = HBaseTestingUtility.getTestDir("testRowSortReducer");
+
+    try {
+      Job job = new Job(conf);
+
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setOutputFormatClass(HFileOutputFormat.class);
+
+      job.setMapperClass(RowSorterMapper.class); // local
+      job.setReducerClass(RowMutationSortReducer.class);
+
+      job.setOutputKeyClass(ImmutableBytesWritable.class);
+      job.setOutputValueClass(KeyValue.class);
+
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(RowMutation.class);
+
+      FileOutputFormat.setOutputPath(job, dir);
+
+      assertTrue(job.waitForCompletion(false));
+    } finally {
+//      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
   /**
    * Test that {@link HFileOutputFormat} RecordWriter amends timestamps if
    * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
@@ -367,8 +469,6 @@ public class TestHFileOutputFormat  {
     }
   }
 
-
-
   private void runIncrementalPELoad(
       Configuration conf, HTable table, Path outDir)
   throws Exception {