You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/07/07 05:14:16 UTC

hbase git commit: HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the Puts (Ram)

Repository: hbase
Updated Branches:
  refs/heads/0.98 e3ef8b69b -> ec506bb01


HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the
Puts (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec506bb0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec506bb0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec506bb0

Branch: refs/heads/0.98
Commit: ec506bb01caf65708353665106faab7af1019d73
Parents: e3ef8b6
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Jul 7 10:43:43 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Jul 7 10:43:43 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/PutSortReducer.java  |  20 ++-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |   8 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 127 +++++++++++++++----
 3 files changed, 126 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
index 792686a..ebaebcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -18,17 +18,25 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.StringUtils;
 
@@ -44,7 +52,17 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceStability.Stable
 public class PutSortReducer extends
     Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
-  
+  // the cell creator
+  private CellCreator kvCreator;
+
+  @Override
+  protected void
+      setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
+          throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.kvCreator = new CellCreator(conf);
+  }
+
   @Override
   protected void reduce(
       ImmutableBytesWritable row,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 168ba40..cb329f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -99,9 +99,8 @@ public class TextSortReducer extends
    */
   @Override
   protected void setup(Context context) {
-    doSetup(context);
-
     Configuration conf = context.getConfiguration();
+    doSetup(context, conf);
 
     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
     if (parser.getRowKeyColumnIndex() == -1) {
@@ -113,10 +112,9 @@ public class TextSortReducer extends
   /**
    * Handles common parameter initialization that a subclass might want to leverage.
    * @param context
+   * @param conf
    */
-  protected void doSetup(Context context) {
-    Configuration conf = context.getConfiguration();
-
+  protected void doSetup(Context context, Configuration conf) {
     // If a custom separator has been used,
     // decode it back from Base64 encoding.
     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index a7f178b..1f8a992 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -170,11 +171,78 @@ public class TestHFileOutputFormat2  {
     }
   }
 
-  private void setupRandomGeneratorMapper(Job job) {
-    job.setInputFormatClass(NMapInputFormat.class);
-    job.setMapperClass(RandomKVGeneratingMapper.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    job.setMapOutputValueClass(KeyValue.class);
+  /**
+   * Simple mapper that makes Put output.
+   */
+  static class RandomPutGeneratingMapper
+      extends Mapper<NullWritable, NullWritable,
+                 ImmutableBytesWritable, Put> {
+
+    private int keyLength;
+    private static final int KEYLEN_DEFAULT=10;
+    private static final String KEYLEN_CONF="randomkv.key.length";
+
+    private int valLength;
+    private static final int VALLEN_DEFAULT=10;
+    private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+    }
+
+    @Override
+    protected void map(
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+               ImmutableBytesWritable,Put>.Context context)
+        throws java.io.IOException ,InterruptedException
+    {
+
+      byte keyBytes[] = new byte[keyLength];
+      byte valBytes[] = new byte[valLength];
+
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+      Random random = new Random();
+      for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+        random.nextBytes(keyBytes);
+        // Ensure that unique tasks generate unique keys
+        keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+        random.nextBytes(valBytes);
+        ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+        for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+          Put p = new Put(keyBytes);
+          p.addColumn(family, QUALIFIER, valBytes);
+          // set TTL to very low so that the scan does not return any value
+          p.setTTL(1l);
+          context.write(key, p);
+        }
+      }
+    }
+  }
+
+  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
+    if (putSortReducer) {
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setMapperClass(RandomPutGeneratingMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(Put.class);
+    } else {
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setMapperClass(RandomKVGeneratingMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(KeyValue.class);
+    }
   }
 
   /**
@@ -312,7 +380,7 @@ public class TestHFileOutputFormat2  {
     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
 
     Job job = new Job(conf, "testWritingPEData");
-    setupRandomGeneratorMapper(job);
+    setupRandomGeneratorMapper(job, false);
     // This partitioner doesn't work well for number keys but using it anyways
     // just to demonstrate how to configure it.
     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
@@ -454,12 +522,18 @@ public class TestHFileOutputFormat2  {
   @Test
   public void testMRIncrementalLoadWithLocality() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
-    doIncrementalLoadTest(false, true);
-    doIncrementalLoadTest(true, true);
+    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
+    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
   }
 
-  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
-      throws Exception {
+  @Test
+  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
+    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
+    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
+  }
+
+  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
+      boolean putSortReducer, String tableStr) throws Exception {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
     conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
@@ -528,21 +602,28 @@ public class TestHFileOutputFormat2  {
       // Perform the actual load
       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
 
-      // Ensure data shows up
-      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
-      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
-        util.countRows(table));
-      Scan scan = new Scan();
-      ResultScanner results = table.getScanner(scan);
-      for (Result res : results) {
-        assertEquals(FAMILIES.length, res.rawCells().length);
-        Cell first = res.rawCells()[0];
-        for (Cell kv : res.rawCells()) {
-          assertTrue(CellUtil.matchingRow(first, kv));
-          assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+      int expectedRows = 0;
+      if (putSortReducer) {
+        // no rows should be extracted
+        assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+          util.countRows(table));
+      } else {
+        // Ensure data shows up
+        expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+        assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+          util.countRows(table));
+        Scan scan = new Scan();
+        ResultScanner results = table.getScanner(scan);
+        for (Result res : results) {
+          assertEquals(FAMILIES.length, res.rawCells().length);
+          Cell first = res.rawCells()[0];
+          for (Cell kv : res.rawCells()) {
+            assertTrue(CellUtil.matchingRow(first, kv));
+            assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+          }
         }
+        results.close();
       }
-      results.close();
       String tableDigestBefore = util.checksumRows(table);
 
       // Check region locality