You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:06 UTC
[06/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
deleted file mode 100644
index dc59817..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ /dev/null
@@ -1,727 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeepDeletedCells;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.LauncherSecurityManager;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Tests the table import and table export MR job functionality
- */
-@Category({VerySlowMapReduceTests.class, MediumTests.class})
-public class TestImportExport {
- private static final Log LOG = LogFactory.getLog(TestImportExport.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
- private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
- private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
- private static final String FAMILYA_STRING = "a";
- private static final String FAMILYB_STRING = "b";
- private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
- private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
- private static final byte[] QUAL = Bytes.toBytes("q");
- private static final String OUTPUT_DIR = "outputdir";
- private static String FQ_OUTPUT_DIR;
- private static final String EXPORT_BATCH_SIZE = "100";
-
- private static long now = System.currentTimeMillis();
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // Up the handlers; this test needs more than usual.
- UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
- UTIL.startMiniCluster();
- FQ_OUTPUT_DIR =
- new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- @Rule
- public final TestName name = new TestName();
-
- @Before
- public void announce() {
- LOG.info("Running " + name.getMethodName());
- }
-
- @Before
- @After
- public void cleanup() throws Exception {
- FileSystem fs = FileSystem.get(UTIL.getConfiguration());
- fs.delete(new Path(OUTPUT_DIR), true);
- }
-
- /**
- * Runs an export job with the specified command line args
- * @param args
- * @return true if job completed successfully
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- boolean runExport(String[] args) throws Exception {
- // need to make a copy of the configuration because to make sure different temp dirs are used.
- int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
- return status == 0;
- }
-
- /**
- * Runs an import job with the specified command line args
- * @param args
- * @return true if job completed successfully
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- boolean runImport(String[] args) throws Exception {
- // need to make a copy of the configuration because to make sure different temp dirs are used.
- int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
- return status == 0;
- }
-
- /**
- * Test simple replication case with column mapping
- * @throws Exception
- */
- @Test
- public void testSimpleCase() throws Exception {
- try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
- Put p = new Put(ROW1);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- t.put(p);
- p = new Put(ROW2);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- t.put(p);
- p = new Put(ROW3);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- t.put(p);
- }
-
- String[] args = new String[] {
- // Only export row1 & row2.
- "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
- "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
- name.getMethodName(),
- FQ_OUTPUT_DIR,
- "1000", // max number of key versions per key to export
- };
- assertTrue(runExport(args));
-
- final String IMPORT_TABLE = name.getMethodName() + "import";
- try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
- args = new String[] {
- "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
- IMPORT_TABLE,
- FQ_OUTPUT_DIR
- };
- assertTrue(runImport(args));
-
- Get g = new Get(ROW1);
- g.setMaxVersions();
- Result r = t.get(g);
- assertEquals(3, r.size());
- g = new Get(ROW2);
- g.setMaxVersions();
- r = t.get(g);
- assertEquals(3, r.size());
- g = new Get(ROW3);
- r = t.get(g);
- assertEquals(0, r.size());
- }
- }
-
- /**
- * Test export hbase:meta table
- *
- * @throws Exception
- */
- @Test
- public void testMetaExport() throws Exception {
- String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
- String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
- assertTrue(runExport(args));
- }
-
- /**
- * Test import data from 0.94 exported file
- * @throws Exception
- */
- @Test
- public void testImport94Table() throws Exception {
- final String name = "exportedTableIn94Format";
- URL url = TestImportExport.class.getResource(name);
- File f = new File(url.toURI());
- if (!f.exists()) {
- LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
- return;
- }
- assertTrue(f.exists());
- LOG.info("FILE=" + f);
- Path importPath = new Path(f.toURI());
- FileSystem fs = FileSystem.get(UTIL.getConfiguration());
- fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
- String IMPORT_TABLE = name;
- try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
- String[] args = new String[] {
- "-Dhbase.import.version=0.94" ,
- IMPORT_TABLE, FQ_OUTPUT_DIR
- };
- assertTrue(runImport(args));
- /* exportedTableIn94Format contains 5 rows
- ROW COLUMN+CELL
- r1 column=f1:c1, timestamp=1383766761171, value=val1
- r2 column=f1:c1, timestamp=1383766771642, value=val2
- r3 column=f1:c1, timestamp=1383766777615, value=val3
- r4 column=f1:c1, timestamp=1383766785146, value=val4
- r5 column=f1:c1, timestamp=1383766791506, value=val5
- */
- assertEquals(5, UTIL.countRows(t));
- }
- }
-
- /**
- * Test export scanner batching
- */
- @Test
- public void testExportScannerBatching() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(1)
- );
- UTIL.getAdmin().createTable(desc);
- try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
-
- Put p = new Put(ROW1);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
- t.put(p);
-
- String[] args = new String[] {
- "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
- name.getMethodName(),
- FQ_OUTPUT_DIR
- };
- assertTrue(runExport(args));
-
- FileSystem fs = FileSystem.get(UTIL.getConfiguration());
- fs.delete(new Path(FQ_OUTPUT_DIR), true);
- }
- }
-
- @Test
- public void testWithDeletes() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
- UTIL.getAdmin().createTable(desc);
- try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
-
- Put p = new Put(ROW1);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
- p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
- t.put(p);
-
- Delete d = new Delete(ROW1, now+3);
- t.delete(d);
- d = new Delete(ROW1);
- d.addColumns(FAMILYA, QUAL, now+2);
- t.delete(d);
- }
-
- String[] args = new String[] {
- "-D" + Export.RAW_SCAN + "=true",
- name.getMethodName(),
- FQ_OUTPUT_DIR,
- "1000", // max number of key versions per key to export
- };
- assertTrue(runExport(args));
-
- final String IMPORT_TABLE = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
- UTIL.getAdmin().createTable(desc);
- try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
- args = new String[] {
- IMPORT_TABLE,
- FQ_OUTPUT_DIR
- };
- assertTrue(runImport(args));
-
- Scan s = new Scan();
- s.setMaxVersions();
- s.setRaw(true);
- ResultScanner scanner = t.getScanner(s);
- Result r = scanner.next();
- Cell[] res = r.rawCells();
- assertTrue(CellUtil.isDeleteFamily(res[0]));
- assertEquals(now+4, res[1].getTimestamp());
- assertEquals(now+3, res[2].getTimestamp());
- assertTrue(CellUtil.isDelete(res[3]));
- assertEquals(now+2, res[4].getTimestamp());
- assertEquals(now+1, res[5].getTimestamp());
- assertEquals(now, res[6].getTimestamp());
- }
- }
-
-
- @Test
- public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
- final TableName exportTable = TableName.valueOf(name.getMethodName());
- HTableDescriptor desc = new HTableDescriptor(exportTable);
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
- UTIL.getAdmin().createTable(desc);
-
- Table exportT = UTIL.getConnection().getTable(exportTable);
-
- //Add first version of QUAL
- Put p = new Put(ROW1);
- p.addColumn(FAMILYA, QUAL, now, QUAL);
- exportT.put(p);
-
- //Add Delete family marker
- Delete d = new Delete(ROW1, now+3);
- exportT.delete(d);
-
- //Add second version of QUAL
- p = new Put(ROW1);
- p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes());
- exportT.put(p);
-
- //Add second Delete family marker
- d = new Delete(ROW1, now+7);
- exportT.delete(d);
-
-
- String[] args = new String[] {
- "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(),
- FQ_OUTPUT_DIR,
- "1000", // max number of key versions per key to export
- };
- assertTrue(runExport(args));
-
- final String importTable = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(importTable));
- desc.addFamily(new HColumnDescriptor(FAMILYA)
- .setMaxVersions(5)
- .setKeepDeletedCells(KeepDeletedCells.TRUE)
- );
- UTIL.getAdmin().createTable(desc);
-
- Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
- args = new String[] {
- importTable,
- FQ_OUTPUT_DIR
- };
- assertTrue(runImport(args));
-
- Scan s = new Scan();
- s.setMaxVersions();
- s.setRaw(true);
-
- ResultScanner importedTScanner = importT.getScanner(s);
- Result importedTResult = importedTScanner.next();
-
- ResultScanner exportedTScanner = exportT.getScanner(s);
- Result exportedTResult = exportedTScanner.next();
- try {
- Result.compareResults(exportedTResult, importedTResult);
- } catch (Exception e) {
- fail("Original and imported tables data comparision failed with error:"+e.getMessage());
- } finally {
- exportT.close();
- importT.close();
- }
- }
-
- /**
- * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
- * attempt with invalid values.
- */
- @Test
- public void testWithFilter() throws Exception {
- // Create simple table to export
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
- UTIL.getAdmin().createTable(desc);
- Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
-
- Put p1 = new Put(ROW1);
- p1.addColumn(FAMILYA, QUAL, now, QUAL);
- p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
- p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
-
- // Having another row would actually test the filter.
- Put p2 = new Put(ROW2);
- p2.addColumn(FAMILYA, QUAL, now, QUAL);
-
- exportTable.put(Arrays.asList(p1, p2));
-
- // Export the simple table
- String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
- assertTrue(runExport(args));
-
- // Import to a new table
- final String IMPORT_TABLE = name.getMethodName() + "import";
- desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
- desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
- UTIL.getAdmin().createTable(desc);
-
- Table importTable = UTIL.getConnection().getTable(desc.getTableName());
- args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
- "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
- FQ_OUTPUT_DIR,
- "1000" };
- assertTrue(runImport(args));
-
- // get the count of the source table for that time range
- PrefixFilter filter = new PrefixFilter(ROW1);
- int count = getCount(exportTable, filter);
-
- Assert.assertEquals("Unexpected row count between export and import tables", count,
- getCount(importTable, null));
-
- // and then test that a broken command doesn't bork everything - easier here because we don't
- // need to re-run the export job
-
- args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
- "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
- FQ_OUTPUT_DIR, "1000" };
- assertFalse(runImport(args));
-
- // cleanup
- exportTable.close();
- importTable.close();
- }
-
- /**
- * Count the number of keyvalues in the specified table for the given timerange
- * @param start
- * @param end
- * @param table
- * @return
- * @throws IOException
- */
- private int getCount(Table table, Filter filter) throws IOException {
- Scan scan = new Scan();
- scan.setFilter(filter);
- ResultScanner results = table.getScanner(scan);
- int count = 0;
- for (Result res : results) {
- count += res.size();
- }
- results.close();
- return count;
- }
-
- /**
- * test main method. Import should print help and call System.exit
- */
- @Test
- public void testImportMain() throws Exception {
- PrintStream oldPrintStream = System.err;
- SecurityManager SECURITY_MANAGER = System.getSecurityManager();
- LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
- System.setSecurityManager(newSecurityManager);
- ByteArrayOutputStream data = new ByteArrayOutputStream();
- String[] args = {};
- System.setErr(new PrintStream(data));
- try {
- System.setErr(new PrintStream(data));
- Import.main(args);
- fail("should be SecurityException");
- } catch (SecurityException e) {
- assertEquals(-1, newSecurityManager.getExitCode());
- assertTrue(data.toString().contains("Wrong number of arguments:"));
- assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
- assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
- assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
- assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
- } finally {
- System.setErr(oldPrintStream);
- System.setSecurityManager(SECURITY_MANAGER);
- }
- }
-
- /**
- * test main method. Export should print help and call System.exit
- */
- @Test
- public void testExportMain() throws Exception {
- PrintStream oldPrintStream = System.err;
- SecurityManager SECURITY_MANAGER = System.getSecurityManager();
- LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
- System.setSecurityManager(newSecurityManager);
- ByteArrayOutputStream data = new ByteArrayOutputStream();
- String[] args = {};
- System.setErr(new PrintStream(data));
- try {
- System.setErr(new PrintStream(data));
- Export.main(args);
- fail("should be SecurityException");
- } catch (SecurityException e) {
- assertEquals(-1, newSecurityManager.getExitCode());
- String errMsg = data.toString();
- assertTrue(errMsg.contains("Wrong number of arguments:"));
- assertTrue(errMsg.contains(
- "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
- "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
- assertTrue(
- errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
- assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
- assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100"));
- assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false"));
- assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false"));
- assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10"));
- } finally {
- System.setErr(oldPrintStream);
- System.setSecurityManager(SECURITY_MANAGER);
- }
- }
-
- /**
- * Test map method of Importer
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void testKeyValueImporter() throws Exception {
- KeyValueImporter importer = new KeyValueImporter();
- Configuration configuration = new Configuration();
- Context ctx = mock(Context.class);
- when(ctx.getConfiguration()).thenReturn(configuration);
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
- KeyValue key = (KeyValue) invocation.getArguments()[1];
- assertEquals("Key", Bytes.toString(writer.get()));
- assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
- return null;
- }
- }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
-
- importer.setup(ctx);
- Result value = mock(Result.class);
- KeyValue[] keys = {
- new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
- Bytes.toBytes("value")),
- new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
- Bytes.toBytes("value1")) };
- when(value.rawCells()).thenReturn(keys);
- importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
-
- }
-
- /**
- * Test addFilterAndArguments method of Import This method set couple
- * parameters into Configuration
- */
- @Test
- public void testAddFilterAndArguments() throws IOException {
- Configuration configuration = new Configuration();
-
- List<String> args = new ArrayList<>();
- args.add("param1");
- args.add("param2");
-
- Import.addFilterAndArguments(configuration, FilterBase.class, args);
- assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
- configuration.get(Import.FILTER_CLASS_CONF_KEY));
- assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
- }
-
- @Test
- public void testDurability() throws Exception {
- // Create an export table.
- String exportTableName = name.getMethodName() + "export";
- try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
-
- // Insert some data
- Put put = new Put(ROW1);
- put.addColumn(FAMILYA, QUAL, now, QUAL);
- put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- exportTable.put(put);
-
- put = new Put(ROW2);
- put.addColumn(FAMILYA, QUAL, now, QUAL);
- put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
- put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
- exportTable.put(put);
-
- // Run the export
- String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
- assertTrue(runExport(args));
-
- // Create the table for import
- String importTableName = name.getMethodName() + "import1";
- Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
-
- // Register the wal listener for the import table
- HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
- .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
- TableWALActionListener walListener = new TableWALActionListener(region);
- WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
- wal.registerWALActionsListener(walListener);
-
- // Run the import with SKIP_WAL
- args =
- new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
- importTableName, FQ_OUTPUT_DIR };
- assertTrue(runImport(args));
- //Assert that the wal is not visisted
- assertTrue(!walListener.isWALVisited());
- //Ensure that the count is 2 (only one version of key value is obtained)
- assertTrue(getCount(importTable, null) == 2);
-
- // Run the import with the default durability option
- importTableName = name.getMethodName() + "import2";
- importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
- region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
- .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
- wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
- walListener = new TableWALActionListener(region);
- wal.registerWALActionsListener(walListener);
- args = new String[] { importTableName, FQ_OUTPUT_DIR };
- assertTrue(runImport(args));
- //Assert that the wal is visisted
- assertTrue(walListener.isWALVisited());
- //Ensure that the count is 2 (only one version of key value is obtained)
- assertTrue(getCount(importTable, null) == 2);
- }
- }
-
- /**
- * This listens to the {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} to
- * identify that an entry is written to the Write Ahead Log for the given table.
- */
- private static class TableWALActionListener extends WALActionsListener.Base {
-
- private HRegionInfo regionInfo;
- private boolean isVisited = false;
-
- public TableWALActionListener(HRegionInfo region) {
- this.regionInfo = region;
- }
-
- @Override
- public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
- if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
- this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
- isVisited = true;
- }
- }
-
- public boolean isWALVisited() {
- return isVisited;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
deleted file mode 100644
index 6d9b05b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
-
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestImportTSVWithOperationAttributes implements Configurable {
- @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
- withTimeout(this.getClass()).withLookingForStuckThread(true).build();
- private static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class);
- protected static final String NAME = TestImportTsv.class.getSimpleName();
- protected static HBaseTestingUtility util = new HBaseTestingUtility();
-
- /**
- * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
- * false.
- */
- protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
-
- /**
- * Force use of combiner in doMROnTableTest. Boolean. Default is true.
- */
- protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
-
- private static Configuration conf;
-
- private static final String TEST_ATR_KEY = "test";
-
- private final String FAMILY = "FAM";
-
- @Rule
- public TestName name = new TestName();
-
- public Configuration getConf() {
- return util.getConfiguration();
- }
-
- public void setConf(Configuration conf) {
- throw new IllegalArgumentException("setConf not supported");
- }
-
- @BeforeClass
- public static void provisionCluster() throws Exception {
- conf = util.getConfiguration();
- conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName());
- conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName());
- util.startMiniCluster();
- }
-
- @AfterClass
- public static void releaseCluster() throws Exception {
- util.shutdownMiniCluster();
- }
-
- @Test
- public void testMROnTable() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1, true);
- util.deleteTable(tableName);
- }
-
- @Test
- public void testMROnTableWithInvalidOperationAttr() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1, false);
- util.deleteTable(tableName);
- }
-
- /**
- * Run an ImportTsv job and perform basic validation on the results. Returns
- * the ImportTsv <code>Tool</code> instance so that other tests can inspect it
- * for further validation as necessary. This method is static to insure
- * non-reliance on instance's util/conf facilities.
- *
- * @param args
- * Any arguments to pass BEFORE inputFile path is appended.
- * @param dataAvailable
- * @return The Tool instance used to run the test.
- */
- private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args,
- int valueMultiplier, boolean dataAvailable) throws Exception {
- String table = args[args.length - 1];
- Configuration conf = new Configuration(util.getConfiguration());
-
- // populate input file
- FileSystem fs = FileSystem.get(conf);
- Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
- FSDataOutputStream op = fs.create(inputPath, true);
- op.write(Bytes.toBytes(data));
- op.close();
- LOG.debug(String.format("Wrote test data to file: %s", inputPath));
-
- if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
- LOG.debug("Forcing combiner.");
- conf.setInt("mapreduce.map.combine.minspills", 1);
- }
-
- // run the import
- List<String> argv = new ArrayList<>(Arrays.asList(args));
- argv.add(inputPath.toString());
- Tool tool = new ImportTsv();
- LOG.debug("Running ImportTsv with arguments: " + argv);
- assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
-
- validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable);
-
- if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
- LOG.debug("Deleting test subdirectory");
- util.cleanupDataTestDirOnTestFS(table);
- }
- return tool;
- }
-
- /**
- * Confirm ImportTsv via data in online table.
- *
- * @param dataAvailable
- */
- private static void validateTable(Configuration conf, TableName tableName, String family,
- int valueMultiplier, boolean dataAvailable) throws IOException {
-
- LOG.debug("Validating table.");
- Connection connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(tableName);
- boolean verified = false;
- long pause = conf.getLong("hbase.client.pause", 5 * 1000);
- int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
- for (int i = 0; i < numRetries; i++) {
- try {
- Scan scan = new Scan();
- // Scan entire family.
- scan.addFamily(Bytes.toBytes(family));
- if (dataAvailable) {
- ResultScanner resScanner = table.getScanner(scan);
- for (Result res : resScanner) {
- LOG.debug("Getting results " + res.size());
- assertTrue(res.size() == 2);
- List<Cell> kvs = res.listCells();
- assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
- assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
- assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
- assertTrue(CellUtil.matchingValue(kvs.get(1),
- Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
- // Only one result set is expected, so let it loop.
- verified = true;
- }
- } else {
- ResultScanner resScanner = table.getScanner(scan);
- Result[] next = resScanner.next(2);
- assertEquals(0, next.length);
- verified = true;
- }
-
- break;
- } catch (NullPointerException e) {
- // If here, a cell was empty. Presume its because updates came in
- // after the scanner had been opened. Wait a while and retry.
- }
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- // continue
- }
- }
- table.close();
- connection.close();
- assertTrue(verified);
- }
-
- public static class OperationAttributesTestController implements RegionObserver {
-
- @Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
- Durability durability) throws IOException {
- Region region = e.getEnvironment().getRegion();
- if (!region.getRegionInfo().isMetaTable()
- && !region.getRegionInfo().getTable().isSystemTable()) {
- if (put.getAttribute(TEST_ATR_KEY) != null) {
- LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString());
- } else {
- e.bypass();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
deleted file mode 100644
index 4ab3d29..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestImportTSVWithTTLs implements Configurable {
-
- protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
- protected static final String NAME = TestImportTsv.class.getSimpleName();
- protected static HBaseTestingUtility util = new HBaseTestingUtility();
-
- /**
- * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
- * false.
- */
- protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
-
- /**
- * Force use of combiner in doMROnTableTest. Boolean. Default is true.
- */
- protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
-
- private final String FAMILY = "FAM";
- private static Configuration conf;
-
- @Rule
- public TestName name = new TestName();
-
- @Override
- public Configuration getConf() {
- return util.getConfiguration();
- }
-
- @Override
- public void setConf(Configuration conf) {
- throw new IllegalArgumentException("setConf not supported");
- }
-
- @BeforeClass
- public static void provisionCluster() throws Exception {
- conf = util.getConfiguration();
- // We don't check persistence in HFiles in this test, but if we ever do we will
- // need this where the default hfile version is not 3 (i.e. 0.98)
- conf.setInt("hfile.format.version", 3);
- conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
- util.startMiniCluster();
- }
-
- @AfterClass
- public static void releaseCluster() throws Exception {
- util.shutdownMiniCluster();
- }
-
- @Test
- public void testMROnTable() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
- util.deleteTable(tableName);
- }
-
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
- String[] args, int valueMultiplier) throws Exception {
- TableName table = TableName.valueOf(args[args.length - 1]);
- Configuration conf = new Configuration(util.getConfiguration());
-
- // populate input file
- FileSystem fs = FileSystem.get(conf);
- Path inputPath = fs.makeQualified(new Path(util
- .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
- FSDataOutputStream op = fs.create(inputPath, true);
- op.write(Bytes.toBytes(data));
- op.close();
- LOG.debug(String.format("Wrote test data to file: %s", inputPath));
-
- if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
- LOG.debug("Forcing combiner.");
- conf.setInt("mapreduce.map.combine.minspills", 1);
- }
-
- // run the import
- List<String> argv = new ArrayList<>(Arrays.asList(args));
- argv.add(inputPath.toString());
- Tool tool = new ImportTsv();
- LOG.debug("Running ImportTsv with arguments: " + argv);
- try {
- // Job will fail if observer rejects entries without TTL
- assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
- } finally {
- // Clean up
- if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
- LOG.debug("Deleting test subdirectory");
- util.cleanupDataTestDirOnTestFS(table.getNameAsString());
- }
- }
-
- return tool;
- }
-
- public static class TTLCheckingObserver implements RegionObserver {
-
- @Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
- Durability durability) throws IOException {
- Region region = e.getEnvironment().getRegion();
- if (!region.getRegionInfo().isMetaTable()
- && !region.getRegionInfo().getTable().isSystemTable()) {
- // The put carries the TTL attribute
- if (put.getTTL() != Long.MAX_VALUE) {
- return;
- }
- throw new IOException("Operation does not have TTL set");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
deleted file mode 100644
index 8967ac7..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.visibility.Authorizations;
-import org.apache.hadoop.hbase.security.visibility.CellVisibility;
-import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator;
-import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator;
-import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
-import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
-import org.apache.hadoop.hbase.security.visibility.VisibilityController;
-import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestImportTSVWithVisibilityLabels implements Configurable {
-
- private static final Log LOG = LogFactory.getLog(TestImportTSVWithVisibilityLabels.class);
- protected static final String NAME = TestImportTsv.class.getSimpleName();
- protected static HBaseTestingUtility util = new HBaseTestingUtility();
-
- /**
- * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
- * false.
- */
- protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
-
- /**
- * Force use of combiner in doMROnTableTest. Boolean. Default is true.
- */
- protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
-
- private final String FAMILY = "FAM";
- private final static String TOPSECRET = "topsecret";
- private final static String PUBLIC = "public";
- private final static String PRIVATE = "private";
- private final static String CONFIDENTIAL = "confidential";
- private final static String SECRET = "secret";
- private static User SUPERUSER;
- private static Configuration conf;
-
- @Rule
- public TestName name = new TestName();
-
- @Override
- public Configuration getConf() {
- return util.getConfiguration();
- }
-
- @Override
- public void setConf(Configuration conf) {
- throw new IllegalArgumentException("setConf not supported");
- }
-
- @BeforeClass
- public static void provisionCluster() throws Exception {
- conf = util.getConfiguration();
- SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
- conf.set("hbase.superuser", "admin,"+User.getCurrent().getName());
- conf.setInt("hfile.format.version", 3);
- conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
- conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
- conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class,
- ScanLabelGenerator.class);
- util.startMiniCluster();
- // Wait for the labels table to become available
- util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000);
- createLabels();
- }
-
- private static void createLabels() throws IOException, InterruptedException {
- PrivilegedExceptionAction<VisibilityLabelsResponse> action =
- new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
- @Override
- public VisibilityLabelsResponse run() throws Exception {
- String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE };
- try (Connection conn = ConnectionFactory.createConnection(conf)) {
- VisibilityClient.addLabels(conn, labels);
- LOG.info("Added labels ");
- } catch (Throwable t) {
- LOG.error("Error in adding labels" , t);
- throw new IOException(t);
- }
- return null;
- }
- };
- SUPERUSER.runAs(action);
- }
-
- @AfterClass
- public static void releaseCluster() throws Exception {
- util.shutdownMiniCluster();
- }
-
- @Test
- public void testMROnTable() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
- util.deleteTable(tableName);
- }
-
- @Test
- public void testMROnTableWithDeletes() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
- issueDeleteAndVerifyData(tableName);
- util.deleteTable(tableName);
- }
-
- private void issueDeleteAndVerifyData(TableName tableName) throws IOException {
- LOG.debug("Validating table after delete.");
- Table table = util.getConnection().getTable(tableName);
- boolean verified = false;
- long pause = conf.getLong("hbase.client.pause", 5 * 1000);
- int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
- for (int i = 0; i < numRetries; i++) {
- try {
- Delete d = new Delete(Bytes.toBytes("KEY"));
- d.addFamily(Bytes.toBytes(FAMILY));
- d.setCellVisibility(new CellVisibility("private&secret"));
- table.delete(d);
-
- Scan scan = new Scan();
- // Scan entire family.
- scan.addFamily(Bytes.toBytes(FAMILY));
- scan.setAuthorizations(new Authorizations("secret", "private"));
- ResultScanner resScanner = table.getScanner(scan);
- Result[] next = resScanner.next(5);
- assertEquals(0, next.length);
- verified = true;
- break;
- } catch (NullPointerException e) {
- // If here, a cell was empty. Presume its because updates came in
- // after the scanner had been opened. Wait a while and retry.
- }
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- // continue
- }
- }
- table.close();
- assertTrue(verified);
- }
-
- @Test
- public void testMROnTableWithBulkload() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
- Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- "-D" + ImportTsv.COLUMNS_CONF_KEY
- + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
- util.deleteTable(tableName);
- }
-
- @Test
- public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
- String FAMILY = "FAM";
- Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles");
- // Prepare the arguments required for the test.
- String[] args =
- new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
- "-D" + ImportTsv.COLUMNS_CONF_KEY
- + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
- table.getNameAsString()
- };
- String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n";
- doMROnTableTest(util, FAMILY, data, args, 4);
- util.deleteTable(table);
- }
-
- @Test
- public void testMRWithOutputFormat() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
- Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
- String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
- util.deleteTable(tableName);
- }
-
- @Test
- public void testBulkOutputWithInvalidLabels() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
- Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
- // Prepare the arguments required for the test.
- String[] args =
- new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
-
- // 2 Data rows, one with valid label and one with invalid label
- String data =
- "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1, 2);
- util.deleteTable(tableName);
- }
-
- @Test
- public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID());
- Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
- // Prepare the arguments required for the test.
- String[] args =
- new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() };
-
- // 2 Data rows, one with valid label and one with invalid label
- String data =
- "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n";
- util.createTable(tableName, FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1, 2);
- util.deleteTable(tableName);
- }
-
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
- String[] args, int valueMultiplier) throws Exception {
- return doMROnTableTest(util, family, data, args, valueMultiplier, -1);
- }
-
- /**
- * Run an ImportTsv job and perform basic validation on the results. Returns
- * the ImportTsv <code>Tool</code> instance so that other tests can inspect it
- * for further validation as necessary. This method is static to insure
- * non-reliance on instance's util/conf facilities.
- *
- * @param args
- * Any arguments to pass BEFORE inputFile path is appended.
- *
- * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check
- *
- * @return The Tool instance used to run the test.
- */
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
- String[] args, int valueMultiplier,int expectedKVCount) throws Exception {
- TableName table = TableName.valueOf(args[args.length - 1]);
- Configuration conf = new Configuration(util.getConfiguration());
-
- // populate input file
- FileSystem fs = FileSystem.get(conf);
- Path inputPath = fs.makeQualified(new Path(util
- .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
- FSDataOutputStream op = fs.create(inputPath, true);
- if (data == null) {
- data = "KEY\u001bVALUE1\u001bVALUE2\n";
- }
- op.write(Bytes.toBytes(data));
- op.close();
- LOG.debug(String.format("Wrote test data to file: %s", inputPath));
-
- if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
- LOG.debug("Forcing combiner.");
- conf.setInt("mapreduce.map.combine.minspills", 1);
- }
-
- // run the import
- List<String> argv = new ArrayList<>(Arrays.asList(args));
- argv.add(inputPath.toString());
- Tool tool = new ImportTsv();
- LOG.debug("Running ImportTsv with arguments: " + argv);
- assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
-
- // Perform basic validation. If the input args did not include
- // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
- // Otherwise, validate presence of hfiles.
- boolean createdHFiles = false;
- String outputPath = null;
- for (String arg : argv) {
- if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
- createdHFiles = true;
- // split '-Dfoo=bar' on '=' and keep 'bar'
- outputPath = arg.split("=")[1];
- break;
- }
- }
- LOG.debug("validating the table " + createdHFiles);
- if (createdHFiles)
- validateHFiles(fs, outputPath, family,expectedKVCount);
- else
- validateTable(conf, table, family, valueMultiplier);
-
- if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
- LOG.debug("Deleting test subdirectory");
- util.cleanupDataTestDirOnTestFS(table.getNameAsString());
- }
- return tool;
- }
-
- /**
- * Confirm ImportTsv via HFiles on fs.
- */
- private static void validateHFiles(FileSystem fs, String outputPath, String family,
- int expectedKVCount) throws IOException {
-
- // validate number and content of output columns
- LOG.debug("Validating HFiles.");
- Set<String> configFamilies = new HashSet<>();
- configFamilies.add(family);
- Set<String> foundFamilies = new HashSet<>();
- int actualKVCount = 0;
- for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
- LOG.debug("The output path has files");
- String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
- String cf = elements[elements.length - 1];
- foundFamilies.add(cf);
- assertTrue(String.format(
- "HFile ouput contains a column family (%s) not present in input families (%s)", cf,
- configFamilies), configFamilies.contains(cf));
- for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
- assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()),
- hfile.getLen() > 0);
- if (expectedKVCount > -1) {
- actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
- }
- }
- }
- if (expectedKVCount > -1) {
- assertTrue(String.format(
- "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount,
- expectedKVCount), actualKVCount == expectedKVCount);
- }
- }
-
- /**
- * Confirm ImportTsv via data in online table.
- */
- private static void validateTable(Configuration conf, TableName tableName, String family,
- int valueMultiplier) throws IOException {
-
- LOG.debug("Validating table.");
- Table table = util.getConnection().getTable(tableName);
- boolean verified = false;
- long pause = conf.getLong("hbase.client.pause", 5 * 1000);
- int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
- for (int i = 0; i < numRetries; i++) {
- try {
- Scan scan = new Scan();
- // Scan entire family.
- scan.addFamily(Bytes.toBytes(family));
- scan.setAuthorizations(new Authorizations("secret","private"));
- ResultScanner resScanner = table.getScanner(scan);
- Result[] next = resScanner.next(5);
- assertEquals(1, next.length);
- for (Result res : resScanner) {
- LOG.debug("Getting results " + res.size());
- assertTrue(res.size() == 2);
- List<Cell> kvs = res.listCells();
- assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
- assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
- assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
- assertTrue(CellUtil.matchingValue(kvs.get(1),
- Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
- // Only one result set is expected, so let it loop.
- }
- verified = true;
- break;
- } catch (NullPointerException e) {
- // If here, a cell was empty. Presume its because updates came in
- // after the scanner had been opened. Wait a while and retry.
- }
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- // continue
- }
- }
- table.close();
- assertTrue(verified);
- }
-
- /**
- * Method returns the total KVs in given hfile
- * @param fs File System
- * @param p HFile path
- * @return KV count in the given hfile
- * @throws IOException
- */
- private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
- Configuration conf = util.getConfiguration();
- HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
- reader.loadFileInfo();
- HFileScanner scanner = reader.getScanner(false, false);
- scanner.seekTo();
- int count = 0;
- do {
- count++;
- } while (scanner.next());
- reader.close();
- return count;
- }
-
-}