You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:23:09 UTC

svn commit: r1181589 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/client/RowMutation.java main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Author: nspiegelberg
Date: Tue Oct 11 02:23:08 2011
New Revision: 1181589

URL: http://svn.apache.org/viewvc?rev=1181589&view=rev
Log:
Implementing tie-breaking mechanism for BulkImports using the new RowMutation system

Summary: I added a global counter in the RowMutation class and means to
serialize the order in which they are explicitly written out so as to still be
able to break ties in case of full key equality. Also had to modify the
SortReducer to take note of the ordering mechanism.
Test Plan: I added a small set of changes to the HFileOutputFormat test so as
to print out two entires with the same key but different values. I checked the
files to make sure they come out in the expected order.
Reviewed By: kannan
Reviewers: kannan
CC: kannan, hbase@lists
Differential Revision: 275696
Task ID: 619437

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java?rev=1181589&r1=1181588&r2=1181589&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java Tue Oct 11 02:23:08 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.io.WritableComparable;
 
@@ -58,38 +59,37 @@ public class RowMutation implements Row 
   private Row row = null;
 
   /**
-   * To be used for Writable.
-   * DO NOT USE!!!
+   * Global counter for internal ordering of mutations
    */
-  public RowMutation() {}
+  private static AtomicLong globalOrderCounter = new AtomicLong(0);
 
   /**
-   * Copy constructor
-   * @param r the item to copy
-   * @throws IOException if passed parameter is not of required type
+   * Field to keep track of the internal ordering of mutations
    */
-  public RowMutation(final RowMutation r)
-  throws IOException {
-    if (null == r) {
-      throw new IOException("Cannot pass a null object to constructor");
-    }
+  private long orderNumber;
 
-    row = r.getInstance();
-  }
+  /**
+   * To be used for Writable.
+   * DO NOT USE!!!
+   */
+  public RowMutation() {}
 
   /**
    * Constructor to set the inner union style field.
-   * @param request -- the Put or Delete to be executed
-   * @throws IOException if passed parameter is not of required type
-   */
-  public RowMutation(final WritableComparable<Row> request)
-  throws IOException {
-    if(request instanceof Put) {
-      row = new Put((Put)request);
-    } else if(request instanceof Delete) {
-      row = new Delete((Delete)request);
+   *
+   * @param request
+   *          the Put or Delete to be executed
+   * @throws IOException
+   *           if the passed parameter is not of the supported type
+   */
+  public RowMutation(final WritableComparable<Row> request) throws IOException {
+    if (request instanceof Put) {
+      row = new Put((Put) request);
+    } else if (request instanceof Delete) {
+      row = new Delete((Delete) request);
     } else {
-      throw new IOException("Must pass either a Delete or a Put");
+      throw new IOException("Type currently not supported: "
+          + request.getClass().getName());
     }
   }
 
@@ -116,14 +116,15 @@ public class RowMutation implements Row 
   throws IOException {
     byte b = in.readByte();
 
-    if(Type.Put.getCode() == b) {
+    if (Type.Put.getCode() == b) {
       row = new Put();
-    } else if(Type.Delete.getCode() == b) {
+    } else if (Type.Delete.getCode() == b) {
       row = new Delete();
     } else {
       throw new IOException("Tried to read an invalid type of serialized object!");
     }
 
+    this.orderNumber = in.readLong();
     row.readFields(in);
   }
 
@@ -132,16 +133,26 @@ public class RowMutation implements Row 
   throws IOException {
     byte b = 0;
 
-    if(row instanceof Put) {
+    if (row instanceof Put) {
       b = Type.Put.getCode();
-    } else if(row instanceof Delete) {
+    } else if (row instanceof Delete) {
       b = Type.Delete.getCode();
     } else {
       throw new IOException("Tried to write an invalid type of object to serialize!");
     }
 
     out.write(b);
+    out.writeLong(RowMutation.globalOrderCounter.incrementAndGet());
+
     row.write(out);
   }
 
+  /**
+   *
+   * @return
+   */
+  public long getOrderNumber() {
+    return this.orderNumber;
+  }
+
 }
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java?rev=1181589&r1=1181588&r2=1181589&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java Tue Oct 11 02:23:08 2011
@@ -61,7 +61,8 @@ public class RowMutationSortReducer exte
       long curSize = 0;
       // stop at the end or the RAM threshold
       while (iter.hasNext() && curSize < threshold) {
-        Row r = iter.next().getInstance();
+        RowMutation rm = iter.next();
+        Row r = rm.getInstance();
         Map< byte[], List<KeyValue> > familyMap;
         if (r instanceof Put) {
           familyMap = ((Put) r).getFamilyMap();
@@ -74,6 +75,7 @@ public class RowMutationSortReducer exte
         if (null != familyMap) {
           for (List<KeyValue> kvs : familyMap.values()) {
             for (KeyValue kv : kvs) {
+              kv.setMemstoreTS(rm.getOrderNumber());
               map.add(kv);
               curSize += kv.getValueLength();
             }
@@ -96,4 +98,4 @@ public class RowMutationSortReducer exte
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181589&r1=1181588&r2=1181589&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:23:08 2011
@@ -47,22 +47,20 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Row;
-
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutation;
-
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -94,6 +92,8 @@ public class TestHFileOutputFormat  {
       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))
       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-C"))};
   private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
+  private static final String oldValue = "valAAAAA";
+  private static final String newValue = "valBBBBB";
 
   private HBaseTestingUtility util = new HBaseTestingUtility();
 
@@ -124,6 +124,7 @@ public class TestHFileOutputFormat  {
       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
     }
 
+    @Override
     protected void map(
         NullWritable n1, NullWritable n2,
         Mapper<NullWritable, NullWritable,
@@ -167,14 +168,15 @@ public class TestHFileOutputFormat  {
   }
 
   static class RowSorterMapper
-  extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, RowMutation> {
+ extends
+      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, RowMutation> {
     @Override
     protected void map(
         NullWritable n1, NullWritable n2,
         Mapper<NullWritable, NullWritable,
                ImmutableBytesWritable, RowMutation>.Context context)
     throws IOException ,InterruptedException
-    {
+ {
       byte[] row = Bytes.toBytes("row1");
 
       // need one for every task...
@@ -183,33 +185,39 @@ public class TestHFileOutputFormat  {
       byte[] col1 = Bytes.toBytes("col1");
       byte[] col2 = Bytes.toBytes("col2");
       byte[] col3 = Bytes.toBytes("col3");
+      byte[] col4 = Bytes.toBytes("col4");
 
       // PUT cf=info-A
-      Row put1 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[0], col1, 10, Bytes.toBytes("val10"));
-      Row put2 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[0], col2, 11, Bytes.toBytes("val11"));
+      Row put1 = new Put(row).add(TestHFileOutputFormat.FAMILIES[0], col1, 10,
+          Bytes.toBytes("val10"));
+      Row put2 = new Put(row).add(TestHFileOutputFormat.FAMILIES[0], col2, 11,
+          Bytes.toBytes("val11"));
 
       // PUT cf=info-B
-      Row put3 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[1], col1, 20, Bytes.toBytes("val20"));
-      Row put4 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[1], col2, 21, Bytes.toBytes("val21"));
-
-      // PUT new column
-      Row put5 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[1], col3, 30, Bytes.toBytes("val30"));
-      Row put6 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[1], col3, 31, Bytes.toBytes("val31"));
-      Row put7 = new Put(row).add(
-          TestHFileOutputFormat.FAMILIES[1], col3, 32, Bytes.toBytes("val32"));
+      Row put3 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col1, 20,
+          Bytes.toBytes("val20"));
+      Row put4 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col2, 21,
+          Bytes.toBytes("val21"));
+
+      // PUT cf=info-B
+      Row put5 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 30,
+          Bytes.toBytes("val30"));
+      Row put6 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 31,
+          Bytes.toBytes("val31"));
+      Row put7 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col3, 32,
+          Bytes.toBytes("val32"));
+
+      Row put8 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col4, 1,
+          Bytes.toBytes(TestHFileOutputFormat.oldValue));
+      Row put9 = new Put(row).add(TestHFileOutputFormat.FAMILIES[1], col4, 1,
+          Bytes.toBytes(TestHFileOutputFormat.newValue));
 
       // DELETEs
       Row del1 = new Delete(row).deleteColumn(
           TestHFileOutputFormat.FAMILIES[1], col2, 21);
 
-      Row del2 = new Delete(row).deleteFamily(
-          TestHFileOutputFormat.FAMILIES[0]);
+      Row del2 = new Delete(row)
+          .deleteFamily(TestHFileOutputFormat.FAMILIES[0]);
 
       Row del3 = new Delete(row).deleteColumns(
           TestHFileOutputFormat.FAMILIES[1], col3);
@@ -223,6 +231,10 @@ public class TestHFileOutputFormat  {
       context.write(new ImmutableBytesWritable(key), new RowMutation(put6));
       context.write(new ImmutableBytesWritable(key), new RowMutation(put7));
 
+      // testing for sequence of writes
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put8));
+      context.write(new ImmutableBytesWritable(key), new RowMutation(put9));
+
       context.write(new ImmutableBytesWritable(key), new RowMutation(del1));
       context.write(new ImmutableBytesWritable(key), new RowMutation(del2));
       context.write(new ImmutableBytesWritable(key), new RowMutation(del3));
@@ -238,6 +250,7 @@ public class TestHFileOutputFormat  {
   throws Exception {
     Configuration conf = new Configuration(this.util.getConfiguration());
     conf.setInt("io.sort.mb", 20);
+    conf.setInt("mapred.map.tasks", 1);
 
     Path dir = HBaseTestingUtility.getTestDir("testRowSortReducer");
 
@@ -247,6 +260,8 @@ public class TestHFileOutputFormat  {
       job.setInputFormatClass(NMapInputFormat.class);
       job.setOutputFormatClass(HFileOutputFormat.class);
 
+      job.setNumReduceTasks(1);
+
       job.setMapperClass(RowSorterMapper.class); // local
       job.setReducerClass(RowMutationSortReducer.class);
 
@@ -259,8 +274,117 @@ public class TestHFileOutputFormat  {
       FileOutputFormat.setOutputPath(job, dir);
 
       assertTrue(job.waitForCompletion(false));
+
+      FileSystem fs = dir.getFileSystem(conf);
+
+      for (FileStatus status : fs.listStatus(dir)) {
+        if (status.isDir()) {
+          String cf = status.getPath().getName();
+
+          if (Bytes.toString(TestHFileOutputFormat.FAMILIES[0]).equals(cf)
+              || Bytes.toString(TestHFileOutputFormat.FAMILIES[1]).equals(cf)) {
+            for (FileStatus stat : fs.listStatus(status.getPath())) {
+              Reader r = HFile.createReader(fs, stat.getPath(), null, false,
+                  true);
+              HFileScanner scanner = r.getScanner(false, false);
+              scanner.seekTo();
+
+              int index = 0;
+
+              // check things for info-A
+              if (Bytes.toString(TestHFileOutputFormat.FAMILIES[0]).equals(cf)) {
+                do {
+                  ++index;
+
+                  KeyValue kv = scanner.getKeyValue();
+                  long ts = kv.getTimestamp();
+
+                  switch (index) {
+                  case 1:
+                    assertTrue(ts <= System.currentTimeMillis());
+                    assertEquals(KeyValue.Type.DeleteFamily.getCode(),
+                        kv.getType());
+                    break;
+                  case 2:
+                    assertEquals(10, ts);
+                    break;
+                  case 3:
+                    assertEquals(11, ts);
+                    break;
+                  default:
+                    fail("Invalid KeyValue " + kv + " found in HFile "
+                        + stat.getPath());
+                    break;
+                  }
+                } while (scanner.next());
+                // the default takes care of index being greater than expected,
+                // but we need one for failed writes, where index would be
+                // smaller
+                assertEquals(3, index);
+
+              }
+
+              // check things for info-B
+              if (Bytes.toString(TestHFileOutputFormat.FAMILIES[1]).equals(cf)) {
+                do {
+                  ++index;
+
+                  KeyValue kv = scanner.getKeyValue();
+                  long ts = kv.getTimestamp();
+
+                  switch (index) {
+                  case 1:
+                    assertEquals(20, ts);
+                    break;
+                  case 2:
+                    assertEquals(21, ts);
+                    assertEquals(KeyValue.Type.Delete.getCode(), kv.getType());
+                    break;
+                  case 3:
+                    assertEquals(21, ts);
+                    assertEquals(KeyValue.Type.Put.getCode(), kv.getType());
+                    break;
+                  case 4:
+                    assertTrue(ts <= System.currentTimeMillis());
+                    assertEquals(KeyValue.Type.DeleteColumn.getCode(),
+                        kv.getType());
+                    break;
+                  case 5:
+                    assertEquals(32, ts);
+                    break;
+                  case 6:
+                    assertEquals(31, ts);
+                    break;
+                  case 7:
+                    assertEquals(30, ts);
+                    break;
+                  case 8:
+                    assertEquals(1, ts);
+                    assertEquals(TestHFileOutputFormat.newValue,
+                        Bytes.toString(kv.getValue()));
+                    break;
+                  case 9:
+                    assertEquals(TestHFileOutputFormat.oldValue,
+                        Bytes.toString(kv.getValue()));
+                    break;
+                  default:
+                    fail("Invalid KeyValue " + kv + " found in HFile "
+                        + stat.getPath());
+                    break;
+                  }
+                } while (scanner.next());
+                // the default takes care of index being greater than expected,
+                // but we need one for failed writes, where index would be
+                // smaller
+                assertEquals(9, index);
+              }
+            }
+          }
+        }
+      }
+
     } finally {
-//      dir.getFileSystem(conf).delete(dir, true);
+      dir.getFileSystem(conf).delete(dir, true);
     }
   }