You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:31 UTC
[31/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
new file mode 100644
index 0000000..7b6e684
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -0,0 +1,571 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestImportTsv implements Configurable {
+
+ private static final Log LOG = LogFactory.getLog(TestImportTsv.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private final String FAMILY = "FAM";
+ private TableName tn;
+ private Map<String, String> args;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ util.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ tn = TableName.valueOf("test-" + UUID.randomUUID());
+ args = new HashMap<>();
+ // Prepare the arguments required for the test.
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
+ }
+
+ @Test
+ public void testMROnTable() throws Exception {
+ util.createTable(tn, FAMILY);
+ doMROnTableTest(null, 1);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testMROnTableWithTimestamp() throws Exception {
+ util.createTable(tn, FAMILY);
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
+ String data = "KEY,1234,VALUE1,VALUE2\n";
+
+ doMROnTableTest(data, 1);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testMROnTableWithCustomMapper()
+ throws Exception {
+ util.createTable(tn, FAMILY);
+ args.put(ImportTsv.MAPPER_CONF_KEY,
+ "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
+
+ doMROnTableTest(null, 3);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testBulkOutputWithoutAnExistingTable() throws Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+
+ doMROnTableTest(null, 3);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testBulkOutputWithAnExistingTable() throws Exception {
+ util.createTable(tn, FAMILY);
+
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+
+ doMROnTableTest(null, 3);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
+ util.createTable(tn, FAMILY);
+
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
+ doMROnTableTest(null, 3);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
+ String INPUT_FILE = "InputFile1.csv";
+ // Prepare the arguments required for the test.
+ String[] args =
+ new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY
+ + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
+ tn.getNameAsString(),
+ INPUT_FILE
+ };
+ assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+ new Configuration(util.getConfiguration()),
+ new ImportTsv() {
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = createSubmittableJob(getConf(), args);
+ assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
+ assertTrue(job.getReducerClass().equals(TextSortReducer.class));
+ assertTrue(job.getMapOutputValueClass().equals(Text.class));
+ return 0;
+ }
+ }, args));
+ // Delete table created by createSubmittableJob.
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
+ args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ String data = "KEY\u001bVALUE4\u001bVALUE8\n";
+ doMROnTableTest(data, 4);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
+ String[] args = new String[] { tn.getNameAsString(), "/inputFile" };
+
+ Configuration conf = new Configuration(util.getConfiguration());
+ conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
+ conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
+ conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
+ exception.expect(TableNotFoundException.class);
+ assertEquals("running test job configuration failed.", 0,
+ ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
+ @Override public int run(String[] args) throws Exception {
+ createSubmittableJob(getConf(), args);
+ return 0;
+ }
+ }, args));
+ }
+
+ @Test
+ public void testMRWithoutAnExistingTable() throws Exception {
+ String[] args =
+ new String[] { tn.getNameAsString(), "/inputFile" };
+
+ exception.expect(TableNotFoundException.class);
+ assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+ new Configuration(util.getConfiguration()),
+ new ImportTsv() {
+ @Override
+ public int run(String[] args) throws Exception {
+ createSubmittableJob(getConf(), args);
+ return 0;
+ }
+ }, args));
+ }
+
+ @Test
+ public void testJobConfigurationsWithDryMode() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
+ String INPUT_FILE = "InputFile1.csv";
+ // Prepare the arguments required for the test.
+ String[] argsArray = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
+ "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
+ tn.getNameAsString(),
+ INPUT_FILE };
+ assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+ new Configuration(util.getConfiguration()),
+ new ImportTsv() {
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = createSubmittableJob(getConf(), args);
+ assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
+ return 0;
+ }
+ }, argsArray));
+ // Delete table created by createSubmittableJob.
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
+ util.createTable(tn, FAMILY);
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ doMROnTableTest(null, 1);
+ // Dry mode should not delete an existing table. If it's not present,
+ // this will throw TableNotFoundException.
+ util.deleteTable(tn);
+ }
+
+ /**
+ * If table is not present in non-bulk mode, dry run should fail just like
+ * normal mode.
+ */
+ @Test
+ public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ exception.expect(TableNotFoundException.class);
+ doMROnTableTest(null, 1);
+ }
+
+ @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
+ util.createTable(tn, FAMILY);
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ doMROnTableTest(null, 1);
+ // Dry mode should not delete an existing table. If it's not present,
+ // this will throw TableNotFoundException.
+ util.deleteTable(tn);
+ }
+
+ /**
+ * If table is not present in bulk mode and create.table is not set to yes,
+ * import should fail with TableNotFoundException.
+ */
+ @Test
+ public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
+ Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
+ exception.expect(TableNotFoundException.class);
+ doMROnTableTest(null, 1);
+ }
+
+ @Test
+ public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
+ doMROnTableTest(null, 1);
+ // Verify temporary table was deleted.
+ exception.expect(TableNotFoundException.class);
+ util.deleteTable(tn);
+ }
+
+ /**
+ * If there are invalid data rows as inputs, then only those rows should be ignored.
+ */
+ @Test
+ public void testTsvImporterTextMapperWithInvalidData() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
+ // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS
+ String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
+ doMROnTableTest(util, tn, FAMILY, data, args, 1, 4);
+ util.deleteTable(tn);
+ }
+
+ @Test
+ public void testSkipEmptyColumns() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
+ args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
+ // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
+ String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
+ doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
+ util.deleteTable(tn);
+ }
+
+ private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
+ return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
+ String family, String data, Map<String, String> args) throws Exception {
+ return doMROnTableTest(util, table, family, data, args, 1,-1);
+ }
+
+ /**
+ * Run an ImportTsv job and perform basic validation on the results.
+ * Returns the ImportTsv <code>Tool</code> instance so that other tests can
+ * inspect it for further validation as necessary. This method is static to
+ * insure non-reliance on instance's util/conf facilities.
+ * @param args Any arguments to pass BEFORE inputFile path is appended.
+ * @return The Tool instance used to run the test.
+ */
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table,
+ String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount)
+ throws Exception {
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(
+ new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ if (data == null) {
+ data = "KEY\u001bVALUE1\u001bVALUE2\n";
+ }
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("mapreduce.map.combine.minspills", 1);
+ }
+
+ // Build args array.
+ String[] argsArray = new String[args.size() + 2];
+ Iterator it = args.entrySet().iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ Map.Entry pair = (Map.Entry) it.next();
+ argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
+ i++;
+ }
+ argsArray[i] = table.getNameAsString();
+ argsArray[i + 1] = inputPath.toString();
+
+ // run the import
+ Tool tool = new ImportTsv();
+ LOG.debug("Running ImportTsv with arguments: " + argsArray);
+ assertEquals(0, ToolRunner.run(conf, tool, argsArray));
+
+ // Perform basic validation. If the input args did not include
+ // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
+ // Otherwise, validate presence of hfiles.
+ boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
+ "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
+ if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+ if (isDryRun) {
+ assertFalse(String.format("Dry run mode, %s should not have been created.",
+ ImportTsv.BULK_OUTPUT_CONF_KEY),
+ fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
+ } else {
+ validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount);
+ }
+ } else {
+ validateTable(conf, table, family, valueMultiplier, isDryRun);
+ }
+
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+ }
+ return tool;
+ }
+
+ /**
+ * Confirm ImportTsv via data in online table.
+ */
+ private static void validateTable(Configuration conf, TableName tableName,
+ String family, int valueMultiplier, boolean isDryRun) throws IOException {
+
+ LOG.debug("Validating table.");
+ Connection connection = ConnectionFactory.createConnection(conf);
+ Table table = connection.getTable(tableName);
+ boolean verified = false;
+ long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ Scan scan = new Scan();
+ // Scan entire family.
+ scan.addFamily(Bytes.toBytes(family));
+ ResultScanner resScanner = table.getScanner(scan);
+ int numRows = 0;
+ for (Result res : resScanner) {
+ numRows++;
+ assertEquals(2, res.size());
+ List<Cell> kvs = res.listCells();
+ assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
+ assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
+ assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier)));
+ assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
+ // Only one result set is expected, so let it loop.
+ }
+ if (isDryRun) {
+ assertEquals(0, numRows);
+ } else {
+ assertEquals(1, numRows);
+ }
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ table.close();
+ connection.close();
+ assertTrue(verified);
+ }
+
+ /**
+ * Confirm ImportTsv via HFiles on fs.
+ */
+ private static void validateHFiles(FileSystem fs, String outputPath, String family,
+ int expectedKVCount) throws IOException {
+ // validate number and content of output columns
+ LOG.debug("Validating HFiles.");
+ Set<String> configFamilies = new HashSet<>();
+ configFamilies.add(family);
+ Set<String> foundFamilies = new HashSet<>();
+ int actualKVCount = 0;
+ for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
+ String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
+ String cf = elements[elements.length - 1];
+ foundFamilies.add(cf);
+ assertTrue(
+ String.format(
+ "HFile output contains a column family (%s) not present in input families (%s)",
+ cf, configFamilies),
+ configFamilies.contains(cf));
+ for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
+ assertTrue(
+ String.format("HFile %s appears to contain no data.", hfile.getPath()),
+ hfile.getLen() > 0);
+ // count the number of KVs from all the hfiles
+ if (expectedKVCount > -1) {
+ actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
+ }
+ }
+ }
+ assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
+ foundFamilies.contains(family));
+ if (expectedKVCount > -1) {
+ assertTrue(String.format(
+ "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount,
+ expectedKVCount), actualKVCount == expectedKVCount);
+ }
+ }
+
+ /**
+ * Method returns the total KVs in given hfile
+ * @param fs File System
+ * @param p HFile path
+ * @return KV count in the given hfile
+ * @throws IOException
+ */
+ private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
+ Configuration conf = util.getConfiguration();
+ HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
+ reader.loadFileInfo();
+ HFileScanner scanner = reader.getScanner(false, false);
+ scanner.seekTo();
+ int count = 0;
+ do {
+ count++;
+ } while (scanner.next());
+ reader.close();
+ return count;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
new file mode 100644
index 0000000..3c38102
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
+
+/**
+ * Tests for {@link TsvParser}.
+ */
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestImportTsvParser {
+
+ private void assertBytesEquals(byte[] a, byte[] b) {
+ assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+ }
+
+ private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
+ ArrayList<String> parsedCols = new ArrayList<>();
+ for (int i = 0; i < parsed.getColumnCount(); i++) {
+ parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
+ parsed.getColumnLength(i)));
+ }
+ if (!Iterables.elementsEqual(parsedCols, expected)) {
+ fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
+ + Joiner.on(",").join(parsedCols));
+ }
+ }
+
+ @Test
+ public void testTsvParserSpecParsing() {
+ TsvParser parser;
+
+ parser = new TsvParser("HBASE_ROW_KEY", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY",
+ "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ assertEquals(4, parser.getAttributesKeyColumnIndex());
+
+ parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY",
+ "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(4, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ assertEquals(0, parser.getAttributesKeyColumnIndex());
+ }
+
+ @Test
+ public void testTsvParser() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
+ assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
+ assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
+ assertNull(parser.getFamily(2));
+ assertNull(parser.getQualifier(2));
+ assertEquals(2, parser.getRowKeyColumnIndex());
+
+ assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex());
+
+ byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
+ ParsedLine parsed = parser.parse(line, line.length);
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ @Test
+ public void testTsvParserWithTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertNull(parser.getFamily(1));
+ assertNull(parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
+ assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+
+ byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
+ ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(1234l, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ /**
+ * Test cases that throw BadTsvLineException
+ */
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("key_only");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
+ byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
+ ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(-1, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserNoTimestampValue() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a");
+ parser.parse(line, line.length);
+ }
+
+ @Test
+ public void testTsvParserParseRowKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a\t1234");
+ Pair<Integer, Integer> rowKeyOffsets = parser.parseRowKey(line, line.length);
+ assertEquals(0, rowKeyOffsets.getFirst().intValue());
+ assertEquals(6, rowKeyOffsets.getSecond().intValue());
+ try {
+ line = Bytes.toBytes("\t\tval_a\t1234");
+ parser.parseRowKey(line, line.length);
+ fail("Should get BadTsvLineException on empty rowkey.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
+ assertEquals(1, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("val_a\trowkey\t1234");
+ rowKeyOffsets = parser.parseRowKey(line, line.length);
+ assertEquals(6, rowKeyOffsets.getFirst().intValue());
+ assertEquals(6, rowKeyOffsets.getSecond().intValue());
+ try {
+ line = Bytes.toBytes("val_a");
+ rowKeyOffsets = parser.parseRowKey(line, line.length);
+ fail("Should get BadTsvLineException when number of columns less than rowkey position.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
+ assertEquals(2, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("val_a\t1234\trowkey");
+ rowKeyOffsets = parser.parseRowKey(line, line.length);
+ assertEquals(11, rowKeyOffsets.getFirst().intValue());
+ assertEquals(6, rowKeyOffsets.getSecond().intValue());
+ }
+
+ @Test
+ public void testTsvParseAttributesKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t");
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value");
+ ParsedLine parse = parser.parse(line, line.length);
+ assertEquals(18, parse.getAttributeKeyOffset());
+ assertEquals(3, parser.getAttributesKeyColumnIndex());
+ String attributes[] = parse.getIndividualAttributes();
+ assertEquals(attributes[0], "key=>value");
+ try {
+ line = Bytes.toBytes("rowkey\tval_a\t1234");
+ parser.parse(line, line.length);
+ fail("Should get BadTsvLineException on empty rowkey.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
+ assertEquals(2, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234");
+ parse = parser.parse(line, line.length);
+ assertEquals(0, parse.getAttributeKeyOffset());
+ assertEquals(0, parser.getAttributesKeyColumnIndex());
+ attributes = parse.getIndividualAttributes();
+ assertEquals(attributes[0], "key=>value");
+ try {
+ line = Bytes.toBytes("val_a");
+ ParsedLine parse2 = parser.parse(line, line.length);
+ fail("Should get BadTsvLineException when number of columns less than rowkey position.");
+ } catch (BadTsvLineException b) {
+
+ }
+ parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
+ assertEquals(3, parser.getRowKeyColumnIndex());
+ line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey");
+ parse = parser.parse(line, line.length);
+ assertEquals(1, parser.getAttributesKeyColumnIndex());
+ assertEquals(6, parse.getAttributeKeyOffset());
+ String[] attr = parse.getIndividualAttributes();
+ int i = 0;
+ for(String str : attr) {
+ assertEquals(("key"+i+"=>"+"value"+i), str );
+ i++;
+ }
+ }
+
+ @Test
+ public void testTsvParserWithCellVisibilityCol() throws BadTsvLineException {
+ TsvParser parser = new TsvParser(
+ "HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY,HBASE_CELL_VISIBILITY", "\t");
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertEquals(4, parser.getCellVisibilityColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value\tPRIVATE&SECRET");
+ ParsedLine parse = parser.parse(line, line.length);
+ assertEquals(18, parse.getAttributeKeyOffset());
+ assertEquals(3, parser.getAttributesKeyColumnIndex());
+ String attributes[] = parse.getIndividualAttributes();
+ assertEquals(attributes[0], "key=>value");
+ assertEquals(29, parse.getCellVisibilityColumnOffset());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java
new file mode 100644
index 0000000..8187b73
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.text.MessageFormat;
+import java.util.Properties;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+/**
+ * This file was forked from hadoop/common/branches/branch-2@1350012.
+ */
+@Category(SmallTests.class)
+public class TestJarFinder {
+
+ @Test
+ public void testJar() throws Exception {
+
+ //picking a class that is for sure in a JAR in the classpath
+ String jar = JarFinder.getJar(LogFactory.class);
+ Assert.assertTrue(new File(jar).exists());
+ }
+
+ private static void delete(File file) throws IOException {
+ if (file.getAbsolutePath().length() < 5) {
+ throw new IllegalArgumentException(
+ MessageFormat.format("Path [{0}] is too short, not deleting",
+ file.getAbsolutePath()));
+ }
+ if (file.exists()) {
+ if (file.isDirectory()) {
+ File[] children = file.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ delete(child);
+ }
+ }
+ }
+ if (!file.delete()) {
+ throw new RuntimeException(
+ MessageFormat.format("Could not delete path [{0}]",
+ file.getAbsolutePath()));
+ }
+ }
+ }
+
+ @Test
+ public void testExpandedClasspath() throws Exception {
+ //picking a class that is for sure in a directory in the classpath
+ //in this case the JAR is created on the fly
+ String jar = JarFinder.getJar(TestJarFinder.class);
+ Assert.assertTrue(new File(jar).exists());
+ }
+
+ @Test
+ public void testExistingManifest() throws Exception {
+ File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+ TestJarFinder.class.getName() + "-testExistingManifest");
+ delete(dir);
+ dir.mkdirs();
+
+ File metaInfDir = new File(dir, "META-INF");
+ metaInfDir.mkdirs();
+ File manifestFile = new File(metaInfDir, "MANIFEST.MF");
+ Manifest manifest = new Manifest();
+ OutputStream os = new FileOutputStream(manifestFile);
+ manifest.write(os);
+ os.close();
+
+ File propsFile = new File(dir, "props.properties");
+ Writer writer = new FileWriter(propsFile);
+ new Properties().store(writer, "");
+ writer.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JarOutputStream zos = new JarOutputStream(baos);
+ JarFinder.jarDir(dir, "", zos);
+ JarInputStream jis =
+ new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Assert.assertNotNull(jis.getManifest());
+ jis.close();
+ }
+
+ @Test
+ public void testNoManifest() throws Exception {
+ File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+ TestJarFinder.class.getName() + "-testNoManifest");
+ delete(dir);
+ dir.mkdirs();
+ File propsFile = new File(dir, "props.properties");
+ Writer writer = new FileWriter(propsFile);
+ new Properties().store(writer, "");
+ writer.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ JarOutputStream zos = new JarOutputStream(baos);
+ JarFinder.jarDir(dir, "", zos);
+ JarInputStream jis =
+ new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Assert.assertNotNull(jis.getManifest());
+ jis.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..529a448
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestLoadIncrementalHFilesSplitRecovery {
+ private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+ static HBaseTestingUtility util;
+ //used by secure subclass
+ static boolean useSecure = false;
+
+ final static int NUM_CFS = 10;
+ final static byte[] QUAL = Bytes.toBytes("qual");
+ final static int ROWCOUNT = 100;
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+
+ @Rule
+ public TestName name = new TestName();
+
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ static byte[] value(int i) {
+ return Bytes.toBytes(String.format("%010d", i));
+ }
+
+ public static void buildHFiles(FileSystem fs, Path dir, int value)
+ throws IOException {
+ byte[] val = value(value);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path testIn = new Path(dir, family(i));
+
+ TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+ Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+ }
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column
+ * families if the table does not already exist.
+ */
+ private void setupTable(final Connection connection, TableName table, int cfs)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ HTableDescriptor htd = new HTableDescriptor(table);
+ for (int i = 0; i < cfs; i++) {
+ htd.addFamily(new HColumnDescriptor(family(i)));
+ }
+ try (Admin admin = connection.getAdmin()) {
+ admin.createTable(htd);
+ }
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ /**
+ * Creates a table with given table name,specified number of column families<br>
+ * and splitkeys if the table does not already exist.
+ * @param table
+ * @param cfs
+ * @param SPLIT_KEYS
+ */
+ private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ HTableDescriptor htd = new HTableDescriptor(table);
+ for (int i = 0; i < cfs; i++) {
+ htd.addFamily(new HColumnDescriptor(family(i)));
+ }
+
+ util.createTable(htd, SPLIT_KEYS);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ private Path buildBulkFiles(TableName table, int value) throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
+ Path bulk1 = new Path(dir, table.getNameAsString() + value);
+ FileSystem fs = util.getTestFileSystem();
+ buildHFiles(fs, bulk1, value);
+ return bulk1;
+ }
+
+ /**
+ * Populate table with known values.
+ */
+ private void populateTable(final Connection connection, TableName table, int value)
+ throws Exception {
+ // create HFiles for different column families
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+ Path bulk1 = buildBulkFiles(table, value);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk1, admin, t, locator);
+ }
+ }
+
+ /**
+ * Split the known table in half. (this is hard coded for this test suite)
+ */
+ private void forceSplit(TableName table) {
+ try {
+ // need to call regions server to by synchronous but isn't visible.
+ HRegionServer hrs = util.getRSForFirstRegionInTable(table);
+
+ for (HRegionInfo hri :
+ ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
+ //ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
+ }
+ }
+
+ // verify that split completed.
+ int regions;
+ do {
+ regions = 0;
+ for (HRegionInfo hri :
+ ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ regions++;
+ }
+ }
+ if (regions != 2) {
+ LOG.info("Taking some time to complete split...");
+ Thread.sleep(250);
+ }
+ } while (regions != 2);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ util.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the
+ * expected number of rows.
+ * @throws IOException
+ */
+ void assertExpectedTable(TableName table, int count, int value) throws IOException {
+ HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
+ assertEquals(htds.length, 1);
+ Table t = null;
+ try {
+ t = util.getConnection().getTable(table);
+ Scan s = new Scan();
+ ResultScanner sr = t.getScanner(s);
+ int i = 0;
+ for (Result r : sr) {
+ i++;
+ for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
+ for (byte[] val : nm.values()) {
+ assertTrue(Bytes.equals(val, value(value)));
+ }
+ }
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ } finally {
+ if (t != null) t.close();
+ }
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in an
+ * exception on the LIHFile client.
+ */
+ @Test(expected=IOException.class, timeout=120000)
+ public void testBulkLoadPhaseFailure() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ final AtomicInteger attmptedCalls = new AtomicInteger();
+ final AtomicInteger failedCalls = new AtomicInteger();
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ try (Connection connection = ConnectionFactory.createConnection(util
+ .getConfiguration())) {
+ setupTable(connection, table, 10);
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ @Override
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+ Collection<LoadQueueItem> lqis) throws IOException {
+ int i = attmptedCalls.incrementAndGet();
+ if (i == 1) {
+ Connection errConn;
+ try {
+ errConn = getMockedConnection(util.getConfiguration());
+ serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
+ } catch (Exception e) {
+ LOG.fatal("mocking cruft, should never happen", e);
+ throw new RuntimeException("mocking cruft, should never happen");
+ }
+ failedCalls.incrementAndGet();
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+ }
+
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+ }
+ };
+ try {
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(table, 1);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(dir, admin, t, locator);
+ }
+ } finally {
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ }
+ fail("doBulkLoad should have thrown an exception");
+ }
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in the
+ * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
+ * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+ */
+ @Test
+ public void testRetryOnIOException() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ final AtomicInteger calls = new AtomicInteger(1);
+ final Connection conn = ConnectionFactory.createConnection(util
+ .getConfiguration());
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ util.getConfiguration().setBoolean(
+ LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+ final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ @Override
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serverCallable, TableName tableName,
+ final byte[] first, Collection<LoadQueueItem> lqis)
+ throws IOException {
+ if (calls.getAndIncrement() < util.getConfiguration().getInt(
+ HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
+ ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
+ conn, tableName, first, new RpcControllerFactory(
+ util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
+ @Override
+ public byte[] rpcCall() throws Exception {
+ throw new IOException("Error calling something on RegionServer");
+ }
+ };
+ return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+ } else {
+ return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+ }
+ }
+ };
+ setupTable(conn, table, 10);
+ Path dir = buildBulkFiles(table, 1);
+ lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
+ conn.getRegionLocator(table));
+ util.getConfiguration().setBoolean(
+ LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+ }
+
+ @SuppressWarnings("deprecation")
+ private ClusterConnection getMockedConnection(final Configuration conf)
+ throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
+ ClusterConnection c = Mockito.mock(ClusterConnection.class);
+ Mockito.when(c.getConfiguration()).thenReturn(conf);
+ Mockito.doNothing().when(c).close();
+ // Make it so we return a particular location when asked.
+ final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+ ServerName.valueOf("example.org", 1234, 0));
+ Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
+ (byte[]) Mockito.any(), Mockito.anyBoolean())).
+ thenReturn(loc);
+ Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
+ thenReturn(loc);
+ ClientProtos.ClientService.BlockingInterface hri =
+ Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
+ Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
+ thenThrow(new ServiceException(new IOException("injecting bulk load error")));
+ Mockito.when(c.getClient(Mockito.any(ServerName.class))).
+ thenReturn(hri);
+ return c;
+ }
+
+ /**
+ * This test exercises the path where there is a split after initial
+ * validation but before the atomic bulk load call. We cannot use presplitting
+ * to test this path, so we actually inject a split just before the atomic
+ * region load.
+ */
+ @Test (timeout=120000)
+ public void testSplitWhileBulkLoadPhase() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, table, 10);
+ populateTable(connection, table,1);
+ assertExpectedTable(table, ROWCOUNT, 1);
+
+ // Now let's cause trouble. This will occur after checks and cause bulk
+ // files to fail when attempt to atomically import. This is recoverable.
+ final AtomicInteger attemptedCalls = new AtomicInteger();
+ LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
+ @Override
+ protected void bulkLoadPhase(final Table htable, final Connection conn,
+ ExecutorService pool, Deque<LoadQueueItem> queue,
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+ Map<LoadQueueItem, ByteBuffer> item2RegionMap)
+ throws IOException {
+ int i = attemptedCalls.incrementAndGet();
+ if (i == 1) {
+ // On first attempt force a split.
+ forceSplit(table);
+ }
+ super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+ }
+ };
+
+ // create HFiles for different column families
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ Path bulk = buildBulkFiles(table, 2);
+ lih2.doBulkLoad(bulk, admin, t, locator);
+ }
+
+ // check that data was loaded
+ // The three expected attempts are 1) failure because need to split, 2)
+ // load of split top 3) load of split bottom
+ assertEquals(attemptedCalls.get(), 3);
+ assertExpectedTable(table, ROWCOUNT, 2);
+ }
+ }
+
+ /**
+ * This test splits a table and attempts to bulk load. The bulk import files
+ * should be split before atomically importing.
+ */
+ @Test (timeout=120000)
+ public void testGroupOrSplitPresplit() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, table, 10);
+ populateTable(connection, table, 1);
+ assertExpectedTable(connection, table, ROWCOUNT, 1);
+ forceSplit(table);
+
+ final AtomicInteger countedLqis= new AtomicInteger();
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ final LoadQueueItem item, final Table htable,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+ startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
+ }
+ return lqis;
+ }
+ };
+
+ // create HFiles for different column families
+ Path bulk = buildBulkFiles(table, 2);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk, admin, t, locator);
+ }
+ assertExpectedTable(connection, table, ROWCOUNT, 2);
+ assertEquals(20, countedLqis.get());
+ }
+ }
+
+ /**
+ * This test creates a table with many small regions. The bulk load files
+ * would be splitted multiple times before all of them can be loaded successfully.
+ */
+ @Test (timeout=120000)
+ public void testSplitTmpFileCleanUp() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+ Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
+ Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+
+ // create HFiles
+ Path bulk = buildBulkFiles(table, 2);
+ try (Table t = connection.getTable(table);
+ RegionLocator locator = connection.getRegionLocator(table);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(bulk, admin, t, locator);
+ }
+ // family path
+ Path tmpPath = new Path(bulk, family(0));
+ // TMP_DIR under family path
+ tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
+ FileSystem fs = bulk.getFileSystem(util.getConfiguration());
+ // HFiles have been splitted, there is TMP_DIR
+ assertTrue(fs.exists(tmpPath));
+ // TMP_DIR should have been cleaned-up
+ assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
+ FSUtils.listStatus(fs, tmpPath));
+ assertExpectedTable(connection, table, ROWCOUNT, 2);
+ }
+ }
+
+ /**
+ * This simulates an remote exception which should cause LIHF to exit with an
+ * exception.
+ */
+ @Test(expected = IOException.class, timeout=120000)
+ public void testGroupOrSplitFailure() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+ setupTable(connection, tableName, 10);
+
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ int i = 0;
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ final LoadQueueItem item, final Table table,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ i++;
+
+ if (i == 5) {
+ throw new IOException("failure");
+ }
+ return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+ }
+ };
+
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(tableName,1);
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ lih.doBulkLoad(dir, admin, t, locator);
+ }
+ }
+
+ fail("doBulkLoad should have thrown an exception");
+ }
+
+ @Test (timeout=120000)
+ public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
+ // Share connection. We were failing to find the table with our new reverse scan because it
+ // looks for first region, not any region -- that is how it works now. The below removes first
+ // region in test. Was reliant on the Connection caching having first region.
+ Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
+ Table table = connection.getTable(tableName);
+
+ setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
+ Path dir = buildBulkFiles(tableName, 2);
+
+ final AtomicInteger countedLqis = new AtomicInteger();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ final LoadQueueItem item, final Table htable,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+ startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
+ }
+ return lqis;
+ }
+ };
+
+ // do bulkload when there is no region hole in hbase:meta.
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ loader.doBulkLoad(dir, admin, t, locator);
+ } catch (Exception e) {
+ LOG.error("exeception=", e);
+ }
+ // check if all the data are loaded into the table.
+ this.assertExpectedTable(tableName, ROWCOUNT, 2);
+
+ dir = buildBulkFiles(tableName, 3);
+
+ // Mess it up by leaving a hole in the hbase:meta
+ List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+ for (HRegionInfo regionInfo : regionInfos) {
+ if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+ MetaTableAccessor.deleteRegion(connection, regionInfo);
+ break;
+ }
+ }
+
+ try (Table t = connection.getTable(tableName);
+ RegionLocator locator = connection.getRegionLocator(tableName);
+ Admin admin = connection.getAdmin()) {
+ loader.doBulkLoad(dir, admin, t, locator);
+ } catch (Exception e) {
+ LOG.error("exception=", e);
+ assertTrue("IOException expected", e instanceof IOException);
+ }
+
+ table.close();
+
+ // Make sure at least the one region that still exists can be found.
+ regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+ assertTrue(regionInfos.size() >= 1);
+
+ this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
+ connection.close();
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the
+ * expected number of rows.
+ * @throws IOException
+ */
+ void assertExpectedTable(final Connection connection, TableName table, int count, int value)
+ throws IOException {
+ HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
+ assertEquals(htds.length, 1);
+ Table t = null;
+ try {
+ t = connection.getTable(table);
+ Scan s = new Scan();
+ ResultScanner sr = t.getScanner(s);
+ int i = 0;
+ for (Result r : sr) {
+ i++;
+ for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
+ for (byte[] val : nm.values()) {
+ assertTrue(Bytes.equals(val, value(value)));
+ }
+ }
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ } finally {
+ if (t != null) t.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
new file mode 100644
index 0000000..0c5207b
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase {
+
+ @BeforeClass
+ public static void setupLogging() {
+ TEST_UTIL.enableDebug(MultiTableInputFormat.class);
+ }
+
+ @Override
+ protected void initJob(List<Scan> scans, Job job) throws IOException {
+ TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..530d9c5
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Function;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
+
+ protected Path restoreDir;
+
+ @BeforeClass
+ public static void setUpSnapshots() throws Exception {
+
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
+
+ // take a snapshot of every table we have.
+ for (String tableName : TABLES) {
+ SnapshotTestingUtils
+ .createSnapshotAndValidate(TEST_UTIL.getAdmin(), TableName.valueOf(tableName),
+ ImmutableList.of(INPUT_FAMILY), null,
+ snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
+ TEST_UTIL.getTestFileSystem(), true);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.restoreDir = TEST_UTIL.getRandomDir();
+ }
+
+ @Override
+ protected void initJob(List<Scan> scans, Job job) throws IOException {
+ TableMapReduceUtil
+ .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+ }
+
+ protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) {
+ return Multimaps.index(scans, new Function<Scan, String>() {
+ @Nullable
+ @Override
+ public String apply(Scan input) {
+ return snapshotNameForTable(
+ Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
+ }
+ }).asMap();
+ }
+
+ public static String snapshotNameForTable(String tableName) {
+ return tableName + "_snapshot";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..1c33848
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+
+@Category({ SmallTests.class })
+public class TestMultiTableSnapshotInputFormatImpl {
+
+ private MultiTableSnapshotInputFormatImpl subject;
+ private Map<String, Collection<Scan>> snapshotScans;
+ private Path restoreDir;
+ private Configuration conf;
+ private Path rootDir;
+
+ @Before
+ public void setUp() throws Exception {
+ this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
+
+ // mock out restoreSnapshot
+ // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper
+ // dependency into the
+ // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current
+ // design, and it *also*
+ // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would
+ // probably be the more "pure"
+ // way of doing things. This is the lesser of two evils, perhaps?
+ doNothing().when(this.subject).
+ restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class),
+ any(Path.class), any(FileSystem.class));
+
+ this.conf = new Configuration();
+ this.rootDir = new Path("file:///test-root-dir");
+ FSUtils.setRootDir(conf, rootDir);
+ this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1",
+ ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2",
+ ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
+ new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))));
+
+ this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
+
+ }
+
+ public void callSetInput() throws IOException {
+ subject.setInput(this.conf, snapshotScans, restoreDir);
+ }
+
+ public Map<String, Collection<ScanWithEquals>> toScanWithEquals(
+ Map<String, Collection<Scan>> snapshotScans) throws IOException {
+ Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap();
+
+ for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
+ List<ScanWithEquals> scans = Lists.newArrayList();
+
+ for (Scan scan : entry.getValue()) {
+ scans.add(new ScanWithEquals(scan));
+ }
+ rtn.put(entry.getKey(), scans);
+ }
+
+ return rtn;
+ }
+
+ public static class ScanWithEquals {
+
+ private final String startRow;
+ private final String stopRow;
+
+ /**
+ * Creates a new instance of this class while copying all values.
+ *
+ * @param scan The scan instance to copy from.
+ * @throws java.io.IOException When copying the values fails.
+ */
+ public ScanWithEquals(Scan scan) throws IOException {
+ this.startRow = Bytes.toStringBinary(scan.getStartRow());
+ this.stopRow = Bytes.toStringBinary(scan.getStopRow());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ScanWithEquals)) {
+ return false;
+ }
+ ScanWithEquals otherScan = (ScanWithEquals) obj;
+ return Objects.equals(this.startRow, otherScan.startRow) && Objects
+ .equals(this.stopRow, otherScan.stopRow);
+ }
+
+ @Override
+ public String toString() {
+ return org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects.
+ toStringHelper(this).add("startRow", startRow)
+ .add("stopRow", stopRow).toString();
+ }
+ }
+
+ @Test
+ public void testSetInputSetsSnapshotToScans() throws Exception {
+
+ callSetInput();
+
+ Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf);
+
+ // convert to scans we can use .equals on
+ Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual);
+ Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans);
+
+ assertEquals(expectedWithEquals, actualWithEquals);
+ }
+
+ @Test
+ public void testSetInputPushesRestoreDirectories() throws Exception {
+ callSetInput();
+
+ Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
+
+ assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
+ }
+
+ @Test
+ public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
+ callSetInput();
+
+ Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf);
+
+ for (Path snapshotDir : restoreDirs.values()) {
+ assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir,
+ snapshotDir.getParent());
+ }
+ }
+
+ @Test
+ public void testSetInputRestoresSnapshots() throws Exception {
+ callSetInput();
+
+ Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf);
+
+ for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
+ verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir),
+ eq(entry.getValue()), any(FileSystem.class));
+ }
+ }
+}