You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/07/07 05:14:16 UTC
hbase git commit: HBASE-16055 PutSortReducer loses any Visibility/acl
attribute set on the Puts (Ram)
Repository: hbase
Updated Branches:
refs/heads/0.98 e3ef8b69b -> ec506bb01
HBASE-16055 PutSortReducer loses any Visibility/acl attribute set on the
Puts (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec506bb0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec506bb0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec506bb0
Branch: refs/heads/0.98
Commit: ec506bb01caf65708353665106faab7af1019d73
Parents: e3ef8b6
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Jul 7 10:43:43 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Jul 7 10:43:43 2016 +0530
----------------------------------------------------------------------
.../hadoop/hbase/mapreduce/PutSortReducer.java | 20 ++-
.../hadoop/hbase/mapreduce/TextSortReducer.java | 8 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 127 +++++++++++++++----
3 files changed, 126 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
index 792686a..ebaebcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -18,17 +18,25 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
@@ -44,7 +52,17 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceStability.Stable
public class PutSortReducer extends
Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
-
+ // the cell creator
+ private CellCreator kvCreator;
+
+ @Override
+ protected void
+ setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.kvCreator = new CellCreator(conf);
+ }
+
@Override
protected void reduce(
ImmutableBytesWritable row,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 168ba40..cb329f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -99,9 +99,8 @@ public class TextSortReducer extends
*/
@Override
protected void setup(Context context) {
- doSetup(context);
-
Configuration conf = context.getConfiguration();
+ doSetup(context, conf);
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
if (parser.getRowKeyColumnIndex() == -1) {
@@ -113,10 +112,9 @@ public class TextSortReducer extends
/**
* Handles common parameter initialization that a subclass might want to leverage.
* @param context
+ * @param conf
*/
- protected void doSetup(Context context) {
- Configuration conf = context.getConfiguration();
-
+ protected void doSetup(Context context, Configuration conf) {
// If a custom separator has been used,
// decode it back from Base64 encoding.
separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ec506bb0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index a7f178b..1f8a992 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.mapreduce.TestImportTSVWithTTLs.TTLCheckingObserver;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -170,11 +171,78 @@ public class TestHFileOutputFormat2 {
}
}
- private void setupRandomGeneratorMapper(Job job) {
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(RandomKVGeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
+ /**
+ * Simple mapper that makes Put output.
+ */
+ static class RandomPutGeneratingMapper
+ extends Mapper<NullWritable, NullWritable,
+ ImmutableBytesWritable, Put> {
+
+ private int keyLength;
+ private static final int KEYLEN_DEFAULT=10;
+ private static final String KEYLEN_CONF="randomkv.key.length";
+
+ private int valLength;
+ private static final int VALLEN_DEFAULT=10;
+ private static final String VALLEN_CONF="randomkv.val.length";
+ private static final byte [] QUALIFIER = Bytes.toBytes("data");
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ }
+
+ @Override
+ protected void map(
+ NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable,
+ ImmutableBytesWritable,Put>.Context context)
+ throws java.io.IOException ,InterruptedException
+ {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+ Random random = new Random();
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+ random.nextBytes(valBytes);
+ ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+ for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
+ Put p = new Put(keyBytes);
+ p.addColumn(family, QUALIFIER, valBytes);
+ // set TTL to very low so that the scan does not return any value
+ p.setTTL(1l);
+ context.write(key, p);
+ }
+ }
+ }
+ }
+
+ private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
+ if (putSortReducer) {
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(RandomPutGeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+ } else {
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(RandomKVGeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ }
}
/**
@@ -312,7 +380,7 @@ public class TestHFileOutputFormat2 {
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
Job job = new Job(conf, "testWritingPEData");
- setupRandomGeneratorMapper(job);
+ setupRandomGeneratorMapper(job, false);
// This partitioner doesn't work well for number keys but using it anyways
// just to demonstrate how to configure it.
byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
@@ -454,12 +522,18 @@ public class TestHFileOutputFormat2 {
@Test
public void testMRIncrementalLoadWithLocality() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
- doIncrementalLoadTest(false, true);
- doIncrementalLoadTest(true, true);
+ doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
+ doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
}
- private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
- throws Exception {
+ @Test
+ public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
+ LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
+ doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
+ }
+
+ private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
+ boolean putSortReducer, String tableStr) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
@@ -528,21 +602,28 @@ public class TestHFileOutputFormat2 {
// Perform the actual load
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
- // Ensure data shows up
- int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
- util.countRows(table));
- Scan scan = new Scan();
- ResultScanner results = table.getScanner(scan);
- for (Result res : results) {
- assertEquals(FAMILIES.length, res.rawCells().length);
- Cell first = res.rawCells()[0];
- for (Cell kv : res.rawCells()) {
- assertTrue(CellUtil.matchingRow(first, kv));
- assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+ int expectedRows = 0;
+ if (putSortReducer) {
+ // no rows should be extracted
+ assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ util.countRows(table));
+ } else {
+ // Ensure data shows up
+ expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
+ assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ util.countRows(table));
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ for (Result res : results) {
+ assertEquals(FAMILIES.length, res.rawCells().length);
+ Cell first = res.rawCells()[0];
+ for (Cell kv : res.rawCells()) {
+ assertTrue(CellUtil.matchingRow(first, kv));
+ assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
+ }
}
+ results.close();
}
- results.close();
String tableDigestBefore = util.checksumRows(table);
// Check region locality