You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/06/25 22:15:07 UTC
hbase git commit: HBASE-13702 ImportTsv: Add dry-run functionality
and log bad rows (Apekshit Sharma)
Repository: hbase
Updated Branches:
refs/heads/master d9ba4d5bb -> e6ed79219
HBASE-13702 ImportTsv: Add dry-run functionality and log bad rows (Apekshit Sharma)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e6ed7921
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e6ed7921
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e6ed7921
Branch: refs/heads/master
Commit: e6ed79219966ce0dac3bc748261fce9478aa7550
Parents: d9ba4d5
Author: tedyu <yu...@gmail.com>
Authored: Thu Jun 25 13:15:07 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Jun 25 13:15:07 2015 -0700
----------------------------------------------------------------------
.../mapreduce/IntegrationTestImportTsv.java | 22 +-
.../hadoop/hbase/mapreduce/ImportTsv.java | 85 ++++-
.../hbase/mapreduce/TsvImporterMapper.java | 22 +-
.../hbase/mapreduce/TsvImporterTextMapper.java | 17 +-
.../hadoop/hbase/mapreduce/TestImportTsv.java | 319 ++++++++++++-------
5 files changed, 296 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ed7921/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
index bd03afe..7ebe825 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -43,7 +45,6 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -187,19 +188,18 @@ public class IntegrationTestImportTsv extends Configured implements Tool {
Path hfiles = new Path(
util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
- String[] args = {
- format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
- format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
- ImportTsv.COLUMNS_CONF_KEY, cf, cf),
- // configure the test harness to NOT delete the HFiles after they're
- // generated. We need those for doLoadIncrementalHFiles
- format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
- table.getNameAsString()
- };
+
+ Map<String, String> args = new HashMap<String, String>();
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.COLUMNS_CONF_KEY,
+ format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
+ // configure the test harness to NOT delete the HFiles after they're
+ // generated. We need those for doLoadIncrementalHFiles
+ args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
// run the job, complete the load.
util.createTable(table, new String[]{cf});
- Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+ Tool t = TestImportTsv.doMROnTableTest(util, table.getNameAsString(), cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table);
// validate post-conditions
http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ed7921/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 5c11bb9..d700121 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -84,6 +86,9 @@ public class ImportTsv extends Configured implements Tool {
public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
// TODO: the rest of these configs are used exclusively by TsvImporterMapper.
// Move them out of the tool and let the mapper handle its own validation.
+ public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
+ // If true, bad lines are logged to stderr. Default: false.
+ public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
@@ -97,6 +102,11 @@ public class ImportTsv extends Configured implements Tool {
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
public final static String CREATE_TABLE_CONF_KEY = "create.table";
public final static String NO_STRICT_COL_FAMILY = "no.strict";
+ /**
+ * If table didn't exist and was created in dry-run mode, this flag is
+ * flipped to delete it when MR ends.
+ */
+ private static boolean dryRunTableCreated;
public static class TsvParser {
/**
@@ -448,9 +458,10 @@ public class ImportTsv extends Configured implements Tool {
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
- public static Job createSubmittableJob(Configuration conf, String[] args)
+ protected static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
Job job = null;
+ boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Admin admin = connection.getAdmin()) {
// Support non-XML supported characters
@@ -474,6 +485,7 @@ public class ImportTsv extends Configured implements Tool {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(mapperClass);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
@@ -484,13 +496,19 @@ public class ImportTsv extends Configured implements Tool {
if (hfileOutPath != null) {
if (!admin.tableExists(tableName)) {
- String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.warn(format("Table '%s' does not exist.", tableName));
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
- LOG.warn(errorMsg);
// TODO: this is backwards. Instead of depending on the existence of a table,
// create a sane splits file for HFileOutputFormat based on data sampling.
createTable(admin, tableName, columns);
+ if (isDryRun) {
+ LOG.warn("Dry run: Table will be deleted at end of dry run.");
+ dryRunTableCreated = true;
+ }
} else {
+ String errorMsg =
+ format("Table '%s' does not exist and '%s' is set to no.", tableName,
+ CREATE_TABLE_CONF_KEY);
LOG.error(errorMsg);
throw new TableNotFoundException(errorMsg);
}
@@ -521,21 +539,22 @@ public class ImportTsv extends Configured implements Tool {
+ "=true.\n";
usage(msg);
System.exit(-1);
- }
+ }
}
- job.setReducerClass(PutSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
if (mapperClass.equals(TsvImporterTextMapper.class)) {
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TextSortReducer.class);
} else {
job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class);
+ job.setReducerClass(PutSortReducer.class);
+ }
+ if (!isDryRun) {
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
+ regionLocator);
}
- HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
- regionLocator);
}
} else {
if (!admin.tableExists(tableName)) {
@@ -550,13 +569,20 @@ public class ImportTsv extends Configured implements Tool {
+ " or custom mapper whose value type is Put.");
System.exit(-1);
}
- // No reducers. Just write straight to table. Call initTableReducerJob
- // to set up the TableOutputFormat.
- TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
- job);
+ if (!isDryRun) {
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // to set up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+ }
job.setNumReduceTasks(0);
}
-
+ if (isDryRun) {
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.getConfiguration().setStrings("io.serializations",
+ job.getConfiguration().get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+ }
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /* Guava used by TsvParser */);
@@ -577,7 +603,24 @@ public class ImportTsv extends Configured implements Tool {
tableName, cfSet));
admin.createTable(htd);
}
-
+
+ private static void deleteTable(Configuration conf, String[] args) {
+ TableName tableName = TableName.valueOf(args[0]);
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ Admin admin = connection.getAdmin()) {
+ try {
+ admin.disableTable(tableName);
+ } catch (TableNotEnabledException e) {
+ LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
+ }
+ admin.deleteTable(tableName);
+ } catch (IOException e) {
+ LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
+ return;
+ }
+ LOG.info(format("Dry run: Deleted table '%s'.", tableName));
+ }
+
private static Set<String> getColumnFamilies(String[] columns) {
Set<String> cfSet = new HashSet<String>();
for (String aColumn : columns) {
@@ -628,7 +671,10 @@ public class ImportTsv extends Configured implements Tool {
" Note: if you do not use this option, then the target table must already exist in HBase\n" +
"\n" +
"Other options that may be specified with -D include:\n" +
+ " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
+ " table. If table does not exist, it is created but deleted in the end.\n" +
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
+ " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
@@ -713,8 +759,13 @@ public class ImportTsv extends Configured implements Tool {
// system time
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
+ dryRunTableCreated = false;
Job job = createSubmittableJob(getConf(), args);
- return job.waitForCompletion(true) ? 0 : 1;
+ boolean success = job.waitForCompletion(true);
+ if (dryRunTableCreated) {
+ deleteTable(getConf(), args);
+ }
+ return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ed7921/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index 270de75..9f1b4c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -57,6 +57,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
+ private boolean logBadLines;
protected ImportTsv.TsvParser parser;
@@ -129,6 +130,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+ logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
}
@@ -163,26 +165,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
populatePut(lineBytes, parsed, put, i);
}
context.write(rowKey, put);
- } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
- if (skipBadLines) {
- System.err.println(
- "Bad line at offset: " + offset.get() + ":\n" +
- badLine.getMessage());
- incrementBadLineCount(1);
- return;
- } else {
- throw new IOException(badLine);
+ } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
+ if (logBadLines) {
+ System.err.println(value);
}
- } catch (IllegalArgumentException e) {
+ System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
if (skipBadLines) {
- System.err.println(
- "Bad line at offset: " + offset.get() + ":\n" +
- e.getMessage());
incrementBadLineCount(1);
return;
- } else {
- throw new IOException(e);
}
+ throw new IOException(badLine);
} catch (InterruptedException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ed7921/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
index 9d97cab..7744ea7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
@@ -45,6 +45,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
+ private boolean logBadLines;
private ImportTsv.TsvParser parser;
@@ -97,6 +98,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
}
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+ logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
}
@@ -110,21 +112,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
context.write(rowKey, value);
- } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+ } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
+ if (logBadLines) {
+ System.err.println(value);
+ }
+ System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
if (skipBadLines) {
- System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
incrementBadLineCount(1);
return;
}
throw new IOException(badLine);
- } catch (IllegalArgumentException e) {
- if (skipBadLines) {
- System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
- incrementBadLineCount(1);
- return;
- } else {
- throw new IOException(e);
- }
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e6ed7921/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index 5208ffb..9f36587 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -19,13 +19,15 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -56,12 +57,16 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
@Category({VerySlowMapReduceTests.class, LargeTests.class})
public class TestImportTsv implements Configurable {
@@ -70,10 +75,7 @@ public class TestImportTsv implements Configurable {
protected static final String NAME = TestImportTsv.class.getSimpleName();
protected static HBaseTestingUtility util = new HBaseTestingUtility();
- /**
- * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
- * false.
- */
+ // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
/**
@@ -82,6 +84,11 @@ public class TestImportTsv implements Configurable {
protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
private final String FAMILY = "FAM";
+ private String table;
+ private Map<String, String> args;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
public Configuration getConf() {
return util.getConfiguration();
@@ -103,112 +110,80 @@ public class TestImportTsv implements Configurable {
util.shutdownMiniCluster();
}
- @Test
- public void testMROnTable() throws Exception {
- String table = "test-" + UUID.randomUUID();
-
+ @Before
+ public void setup() throws Exception {
+ table = "test-" + UUID.randomUUID();
+ args = new HashMap<String, String>();
// Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- table
- };
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
+ }
+ @Test
+ public void testMROnTable() throws Exception {
util.createTable(TableName.valueOf(table), FAMILY);
- doMROnTableTest(util, FAMILY, null, args, 1);
+ doMROnTableTest(null, 1);
util.deleteTable(table);
}
@Test
public void testMROnTableWithTimestamp() throws Exception {
- String table = "test-" + UUID.randomUUID();
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY
- + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
- table
- };
+ util.createTable(TableName.valueOf(table), FAMILY);
+ args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+ args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
String data = "KEY,1234,VALUE1,VALUE2\n";
- util.createTable(TableName.valueOf(table), FAMILY);
- doMROnTableTest(util, FAMILY, data, args, 1);
+ doMROnTableTest(data, 1);
util.deleteTable(table);
}
-
@Test
public void testMROnTableWithCustomMapper()
throws Exception {
- String table = "test-" + UUID.randomUUID();
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
- table
- };
-
util.createTable(TableName.valueOf(table), FAMILY);
- doMROnTableTest(util, FAMILY, null, args, 3);
+ args.put(ImportTsv.MAPPER_CONF_KEY,
+ "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
+
+ doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithoutAnExistingTable() throws Exception {
- String table = "test-" + UUID.randomUUID();
-
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- table
- };
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
- doMROnTableTest(util, FAMILY, null, args, 3);
+ doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithAnExistingTable() throws Exception {
- String table = "test-" + UUID.randomUUID();
+ util.createTable(TableName.valueOf(table), FAMILY);
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- table
- };
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
- util.createTable(TableName.valueOf(table), FAMILY);
- doMROnTableTest(util, FAMILY, null, args, 3);
+ doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
- String table = "test-" + UUID.randomUUID();
+ util.createTable(TableName.valueOf(table), FAMILY);
+
// Prepare the arguments required for the test.
Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
- "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true",
- table
- };
- util.createTable(TableName.valueOf(table), FAMILY);
- doMROnTableTest(util, FAMILY, null, args, 3);
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
+ doMROnTableTest(null, 3);
util.deleteTable(table);
}
@Test
public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
- String table = "test-" + UUID.randomUUID();
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
String INPUT_FILE = "InputFile1.csv";
// Prepare the arguments required for the test.
@@ -234,53 +209,150 @@ public class TestImportTsv implements Configurable {
return 0;
}
}, args));
+ // Delete table created by createSubmittableJob.
+ util.deleteTable(table);
}
@Test
public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
- String table = "test-" + UUID.randomUUID();
- String FAMILY = "FAM";
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
- // Prepare the arguments required for the test.
- String[] args =
- new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY
- + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
- "-D" + ImportTsv.COLUMNS_CONF_KEY
- + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table
- };
+ args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
String data = "KEY\u001bVALUE4\u001bVALUE8\n";
- doMROnTableTest(util, FAMILY, data, args, 4);
+ doMROnTableTest(data, 4);
+ util.deleteTable(table);
}
- @Test(expected = TableNotFoundException.class)
+ @Test
public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
- String table = "test-" + UUID.randomUUID();
- String[] args =
- new String[] { table, "/inputFile" };
+ String[] args = new String[] { table, "/inputFile" };
Configuration conf = new Configuration(util.getConfiguration());
conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
- ImportTsv.createSubmittableJob(conf, args);
+ exception.expect(TableNotFoundException.class);
+ assertEquals("running test job configuration failed.", 0,
+ ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() {
+ @Override public int run(String[] args) throws Exception {
+ createSubmittableJob(getConf(), args);
+ return 0;
+ }
+ }, args));
}
- @Test(expected = TableNotFoundException.class)
+ @Test
public void testMRWithoutAnExistingTable() throws Exception {
- String table = "test-" + UUID.randomUUID();
String[] args =
new String[] { table, "/inputFile" };
- Configuration conf = new Configuration(util.getConfiguration());
- ImportTsv.createSubmittableJob(conf, args);
+ exception.expect(TableNotFoundException.class);
+ assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+ new Configuration(util.getConfiguration()),
+ new ImportTsv() {
+ @Override
+ public int run(String[] args) throws Exception {
+ createSubmittableJob(getConf(), args);
+ return 0;
+ }
+ }, args));
+ }
+
+ @Test
+ public void testJobConfigurationsWithDryMode() throws Exception {
+ Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
+ String INPUT_FILE = "InputFile1.csv";
+ // Prepare the arguments required for the test.
+ String[] argsArray = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
+ "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
+ table,
+ INPUT_FILE };
+ assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+ new Configuration(util.getConfiguration()),
+ new ImportTsv() {
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = createSubmittableJob(getConf(), args);
+ assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
+ return 0;
+ }
+ }, argsArray));
+ // Delete table created by createSubmittableJob.
+ util.deleteTable(table);
}
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
- String data, String[] args) throws Exception {
- return doMROnTableTest(util, family, data, args, 1);
+ @Test
+ public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
+ util.createTable(TableName.valueOf(table), FAMILY);
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ doMROnTableTest(null, 1);
+ // Dry mode should not delete an existing table. If it's not present,
+ // this will throw TableNotFoundException.
+ util.deleteTable(table);
+ }
+
+ /**
+ * If table is not present in non-bulk mode, dry run should fail just like
+ * normal mode.
+ */
+ @Test
+ public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ exception.expect(TableNotFoundException.class);
+ doMROnTableTest(null, 1);
+ }
+
+ @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
+ util.createTable(TableName.valueOf(table), FAMILY);
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ doMROnTableTest(null, 1);
+ // Dry mode should not delete an existing table. If it's not present,
+ // this will throw TableNotFoundException.
+ util.deleteTable(table);
+ }
+
+ /**
+ * If table is not present in bulk mode and create.table is not set to yes,
+ * import should fail with TableNotFoundException.
+ */
+ @Test
+ public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
+ Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
+ exception.expect(TableNotFoundException.class);
+ doMROnTableTest(null, 1);
+ }
+
+ @Test
+ public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+ args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+ args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+ args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
+ doMROnTableTest(null, 1);
+ // Verify temporary table was deleted.
+ exception.expect(TableNotFoundException.class);
+ util.deleteTable(table);
+ }
+
+ private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
+ return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
+ String family, String data, Map<String, String> args) throws Exception {
+ return doMROnTableTest(util, table, family, data, args, 1);
}
/**
@@ -291,10 +363,9 @@ public class TestImportTsv implements Configurable {
* @param args Any arguments to pass BEFORE inputFile path is appended.
* @return The Tool instance used to run the test.
*/
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
- String data, String[] args, int valueMultiplier)
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
+ String family, String data, Map<String, String> args, int valueMultiplier)
throws Exception {
- String table = args[args.length - 1];
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
@@ -313,32 +384,40 @@ public class TestImportTsv implements Configurable {
conf.setInt("mapreduce.map.combine.minspills", 1);
}
+ // Build args array.
+ String[] argsArray = new String[args.size() + 2];
+ Iterator it = args.entrySet().iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ Map.Entry pair = (Map.Entry) it.next();
+ argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
+ i++;
+ }
+ argsArray[i] = table;
+ argsArray[i + 1] = inputPath.toString();
+
// run the import
- List<String> argv = new ArrayList<String>(Arrays.asList(args));
- argv.add(inputPath.toString());
Tool tool = new ImportTsv();
- LOG.debug("Running ImportTsv with arguments: " + argv);
- assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+ LOG.debug("Running ImportTsv with arguments: " + argsArray);
+ assertEquals(0, ToolRunner.run(conf, tool, argsArray));
// Perform basic validation. If the input args did not include
// ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
// Otherwise, validate presence of hfiles.
- boolean createdHFiles = false;
- String outputPath = null;
- for (String arg : argv) {
- if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
- createdHFiles = true;
- // split '-Dfoo=bar' on '=' and keep 'bar'
- outputPath = arg.split("=")[1];
- break;
+ boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
+ "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
+ if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+ if (isDryRun) {
+ assertFalse(String.format("Dry run mode, %s should not have been created.",
+ ImportTsv.BULK_OUTPUT_CONF_KEY),
+ fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
+ } else {
+ validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family);
}
+ } else {
+ validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun);
}
- if (createdHFiles)
- validateHFiles(fs, outputPath, family);
- else
- validateTable(conf, TableName.valueOf(table), family, valueMultiplier);
-
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table);
@@ -350,7 +429,7 @@ public class TestImportTsv implements Configurable {
* Confirm ImportTsv via data in online table.
*/
private static void validateTable(Configuration conf, TableName tableName,
- String family, int valueMultiplier) throws IOException {
+ String family, int valueMultiplier, boolean isDryRun) throws IOException {
LOG.debug("Validating table.");
Connection connection = ConnectionFactory.createConnection(conf);
@@ -364,8 +443,10 @@ public class TestImportTsv implements Configurable {
// Scan entire family.
scan.addFamily(Bytes.toBytes(family));
ResultScanner resScanner = table.getScanner(scan);
+ int numRows = 0;
for (Result res : resScanner) {
- assertTrue(res.size() == 2);
+ numRows++;
+ assertEquals(2, res.size());
List<Cell> kvs = res.listCells();
assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
@@ -373,6 +454,11 @@ public class TestImportTsv implements Configurable {
assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
// Only one result set is expected, so let it loop.
}
+ if (isDryRun) {
+ assertEquals(0, numRows);
+ } else {
+ assertEquals(1, numRows);
+ }
verified = true;
break;
} catch (NullPointerException e) {
@@ -395,7 +481,6 @@ public class TestImportTsv implements Configurable {
*/
private static void validateHFiles(FileSystem fs, String outputPath, String family)
throws IOException {
-
// validate number and content of output columns
LOG.debug("Validating HFiles.");
Set<String> configFamilies = new HashSet<String>();
@@ -407,7 +492,7 @@ public class TestImportTsv implements Configurable {
foundFamilies.add(cf);
assertTrue(
String.format(
- "HFile ouput contains a column family (%s) not present in input families (%s)",
+ "HFile output contains a column family (%s) not present in input families (%s)",
cf, configFamilies),
configFamilies.contains(cf));
for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
@@ -416,6 +501,8 @@ public class TestImportTsv implements Configurable {
hfile.getLen() > 0);
}
}
+ assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
+ foundFamilies.contains(family));
}
}