You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/03/14 23:28:08 UTC
svn commit: r1456700 - in /hbase/trunk:
hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/
hbase-server/src/test/java/org/apache/hadoop/hbase/
hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/
Author: enis
Date: Thu Mar 14 22:28:08 2013
New Revision: 1456700
URL: http://svn.apache.org/r1456700
Log:
HBASE-7938 Add integration test for ImportTsv/LoadIncrementalHFiles workflow (Nick Dimiduk)
Added:
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
Modified:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java?rev=1456700&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java (added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java Thu Mar 14 22:28:08 2013
@@ -0,0 +1,191 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestImportTsv implements Configurable, Tool {
+
+ private static final String NAME = IntegrationTestImportTsv.class.getSimpleName();
+ protected static final Log LOG = LogFactory.getLog(IntegrationTestImportTsv.class);
+
+ protected static final String simple_tsv =
+ "row1\t1\tc1\tc2\n" +
+ "row2\t1\tc1\tc2\n" +
+ "row3\t1\tc1\tc2\n" +
+ "row4\t1\tc1\tc2\n" +
+ "row5\t1\tc1\tc2\n" +
+ "row6\t1\tc1\tc2\n" +
+ "row7\t1\tc1\tc2\n" +
+ "row8\t1\tc1\tc2\n" +
+ "row9\t1\tc1\tc2\n" +
+ "row10\t1\tc1\tc2\n";
+
+ protected static final Set<KeyValue> simple_expected =
+ new TreeSet<KeyValue>(KeyValue.COMPARATOR) {
+ private static final long serialVersionUID = 1L;
+ {
+ byte[] family = Bytes.toBytes("d");
+ for (String line : simple_tsv.split("\n")) {
+ String[] row = line.split("\t");
+ byte[] key = Bytes.toBytes(row[0]);
+ long ts = Long.parseLong(row[1]);
+ byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
+ add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0]));
+ add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1]));
+ }
+ }
+ };
+
+ // this instance is initialized on first access when the test is run from
+ // JUnit/Maven or by main when run from the CLI.
+ protected static IntegrationTestingUtility util = null;
+
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ if (null == util) {
+ util = new IntegrationTestingUtility();
+ }
+ util.initializeCluster(1);
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.restoreCluster();
+ util = null;
+ }
+
+ /**
+ * Verify the data described by <code>simple_tsv</code> matches
+ * <code>simple_expected</code>.
+ */
+ protected void doLoadIncrementalHFiles(Path hfiles, String tableName)
+ throws Exception {
+
+ String[] args = { hfiles.toString(), tableName };
+ LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
+ assertEquals("Loading HFiles failed.",
+ 0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args));
+
+ HTable table = null;
+ Scan scan = new Scan() {{
+ setCacheBlocks(false);
+ setCaching(1000);
+ }};
+ try {
+ table = new HTable(getConf(), tableName);
+ Iterator<Result> resultsIt = table.getScanner(scan).iterator();
+ Iterator<KeyValue> expectedIt = simple_expected.iterator();
+ while (resultsIt.hasNext() && expectedIt.hasNext()) {
+ Result r = resultsIt.next();
+ for (KeyValue actual : r.raw()) {
+ assertTrue(
+ "Ran out of expected values prematurely!",
+ expectedIt.hasNext());
+ KeyValue expected = expectedIt.next();
+ assertTrue(
+ format("Scan produced surprising result. expected: <%s>, actual: %s",
+ expected, actual),
+ KeyValue.COMPARATOR.compare(expected, actual) == 0);
+ }
+ }
+ assertFalse("Did not consume all expected values.", expectedIt.hasNext());
+ assertFalse("Did not consume all scan results.", resultsIt.hasNext());
+ } finally {
+ if (null != table) table.close();
+ }
+ }
+
+ @Test
+ public void testGenerateAndLoad() throws Exception {
+ String table = NAME + "-" + UUID.randomUUID();
+ String cf = "d";
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+
+ String[] args = {
+ format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
+ format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
+ ImportTsv.COLUMNS_CONF_KEY, cf, cf),
+ // configure the test harness to NOT delete the HFiles after they're
+ // generated. We need those for doLoadIncrementalHFiles
+ format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
+ table
+ };
+
+ // run the job, complete the load.
+ util.createTable(table, cf);
+ TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+ doLoadIncrementalHFiles(hfiles, table);
+ util.deleteTable(table);
+ util.cleanupDataTestDirOnTestFS(table);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length != 0) {
+ System.err.println(format("%s [genericOptions]", NAME));
+ System.err.println(" Runs ImportTsv integration tests against a distributed cluster.");
+ System.err.println();
+ GenericOptionsParser.printGenericCommandUsage(System.err);
+ return 1;
+ }
+
+ // adding more test methods? Don't forget to add them here... or consider doing what
+ // IntegrationTestsDriver does.
+ provisionCluster();
+ testGenerateAndLoad();
+ releaseCluster();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ util = new IntegrationTestingUtility(conf);
+ // not using ToolRunner to avoid unnecessary call to setConf()
+ args = new GenericOptionsParser(conf, args).getRemainingArgs();
+ int status = new IntegrationTestImportTsv().run(args);
+ System.exit(status);
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1456700&r1=1456699&r2=1456700&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Mar 14 22:28:08 2013
@@ -315,7 +315,7 @@ public class HBaseTestingUtility extends
/**
* @return Where to write test data on the test filesystem; Returns working directory
- * for the test filesytem by default
+ * for the test filesystem by default
* @see #setupDataTestDirOnTestFS()
* @see #getTestFileSystem()
*/
@@ -396,6 +396,7 @@ public class HBaseTestingUtility extends
FileSystem fs = getTestFileSystem();
if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
File dataTestDir = new File(getDataTestDir().toString());
+ dataTestDir.deleteOnExit();
dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
} else {
Path base = getBaseTestDirOnTestFS();
@@ -404,6 +405,29 @@ public class HBaseTestingUtility extends
fs.deleteOnExit(dataTestDirOnTestFS);
}
}
+
+ /**
+ * Cleans the test data directory on the test filesystem.
+ * @return True if we removed the test dirs
+ * @throws IOException
+ */
+ public boolean cleanupDataTestDirOnTestFS() throws IOException {
+ boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
+ if (ret)
+ dataTestDirOnTestFS = null;
+ return ret;
+ }
+
+ /**
+ * Cleans a subdirectory under the test data directory on the test filesystem.
+ * @return True if we removed child
+ * @throws IOException
+ */
+ public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
+ Path cpath = getDataTestDirOnTestFS(subdirName);
+ return getTestFileSystem().delete(cpath, true);
+ }
+
/**
* Start a minidfscluster.
* @param servers How many DNs to start.
@@ -959,6 +983,33 @@ public class HBaseTestingUtility extends
getMiniHBaseCluster().compact(tableName, major);
}
+ /**
+ * Create a table.
+ * @param tableName
+ * @param family
+ * @return An HTable instance for the created table.
+ * @throws IOException
+ */
+ public HTable createTable(String tableName, String family)
+ throws IOException{
+ return createTable(tableName, new String[] { family });
+ }
+
+ /**
+ * Create a table.
+ * @param tableName
+ * @param families
+ * @return An HTable instance for the created table.
+ * @throws IOException
+ */
+ public HTable createTable(String tableName, String[] families)
+ throws IOException {
+ List<byte[]> fams = new ArrayList<byte[]>(families.length);
+ for (String family : families) {
+ fams.add(Bytes.toBytes(family));
+ }
+ return createTable(Bytes.toBytes(tableName), fams.toArray(new byte[0][]));
+ }
/**
* Create a table.
@@ -1121,6 +1172,14 @@ public class HBaseTestingUtility extends
* Drop an existing table
* @param tableName existing table
*/
+ public void deleteTable(String tableName) throws IOException {
+ deleteTable(Bytes.toBytes(tableName));
+ }
+
+ /**
+ * Drop an existing table
+ * @param tableName existing table
+ */
public void deleteTable(byte[] tableName) throws IOException {
try {
getHBaseAdmin().disableTable(tableName);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1456700&r1=1456699&r2=1456700&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Thu Mar 14 22:28:08 2013
@@ -18,343 +18,306 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.io.UnsupportedEncodingException;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
-@Category(MediumTests.class)
-public class TestImportTsv {
- private static final Log LOG = LogFactory.getLog(TestImportTsv.class);
+@Category(LargeTests.class)
+public class TestImportTsv implements Configurable {
- @Test
- public void testTsvParserSpecParsing() {
- TsvParser parser;
+ protected static final Log LOG = LogFactory.getLog(TestImportTsv.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
- parser = new TsvParser("HBASE_ROW_KEY", "\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
-
- parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
- assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
-
- parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
- assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
- assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
-
- parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
- "\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
- assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
- assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertTrue(parser.hasTimestamp());
- assertEquals(2, parser.getTimestampKeyColumnIndex());
- }
+ /**
+ * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+ * false.
+ */
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
- @Test
- public void testTsvParser() throws BadTsvLineException {
- TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
- assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
- assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
- assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
- assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
- assertNull(parser.getFamily(2));
- assertNull(parser.getQualifier(2));
- assertEquals(2, parser.getRowKeyColumnIndex());
-
- assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser
- .getTimestampKeyColumnIndex());
-
- byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
- ParsedLine parsed = parser.parse(line, line.length);
- checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
- }
-
-
- @Test
- public void testTsvParserWithTimestamp() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertNull(parser.getFamily(1));
- assertNull(parser.getQualifier(1));
- assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
- assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertEquals(1, parser.getTimestampKeyColumnIndex());
-
- byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
- ParsedLine parsed = parser.parse(line, line.length);
- assertEquals(1234l, parsed.getTimestamp(-1));
- checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
- }
-
- private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
- ArrayList<String> parsedCols = new ArrayList<String>();
- for (int i = 0; i < parsed.getColumnCount(); i++) {
- parsedCols.add(Bytes.toString(
- parsed.getLineBytes(),
- parsed.getColumnOffset(i),
- parsed.getColumnLength(i)));
- }
- if (!Iterables.elementsEqual(parsedCols, expected)) {
- fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
- "Got:" + Joiner.on(",").join(parsedCols));
- }
- }
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private final String FAMILY = "FAM";
- private void assertBytesEquals(byte[] a, byte[] b) {
- assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+ public Configuration getConf() {
+ return util.getConfiguration();
}
- /**
- * Test cases that throw BadTsvLineException
- */
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
- byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
- parser.parse(line, line.length);
- }
-
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
- byte[] line = Bytes.toBytes("");
- parser.parse(line, line.length);
- }
-
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
- byte[] line = Bytes.toBytes("key_only");
- parser.parse(line, line.length);
- }
-
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
- TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
- byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
- parser.parse(line, line.length);
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
}
-
- @Test(expected = BadTsvLineException.class)
- public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
- assertEquals(1, parser.getTimestampKeyColumnIndex());
- byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
- ParsedLine parsed = parser.parse(line, line.length);
- assertEquals(-1, parsed.getTimestamp(-1));
- checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ util.startMiniCluster();
+ util.startMiniMapReduceCluster();
}
-
- @Test(expected = BadTsvLineException.class)
- public void testTsvParserNoTimestampValue() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
- assertEquals(2, parser.getTimestampKeyColumnIndex());
- byte[] line = Bytes.toBytes("rowkey\tval_a");
- parser.parse(line, line.length);
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
}
-
@Test
- public void testMROnTable()
- throws Exception {
- String TABLE_NAME = "TestTable";
- String FAMILY = "FAM";
- String INPUT_FILE = "InputFile.esv";
+ public void testMROnTable() throws Exception {
+ String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- TABLE_NAME,
- INPUT_FILE
+ table
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
+ util.createTable(table, FAMILY);
+ doMROnTableTest(util, FAMILY, null, args, 1);
+ util.deleteTable(table);
}
@Test
public void testMROnTableWithTimestamp() throws Exception {
- String TABLE_NAME = "TestTable";
- String FAMILY = "FAM";
- String INPUT_FILE = "InputFile1.csv";
+ String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY
+ "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
-
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+ table
+ };
String data = "KEY,1234,VALUE1,VALUE2\n";
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
+
+ util.createTable(table, FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1);
+ util.deleteTable(table);
}
@Test
public void testMROnTableWithCustomMapper()
throws Exception {
- String TABLE_NAME = "TestTable";
- String FAMILY = "FAM";
- String INPUT_FILE = "InputFile2.esv";
+ String table = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
- TABLE_NAME,
- INPUT_FILE
+ table
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
+ util.createTable(table, FAMILY);
+ doMROnTableTest(util, FAMILY, null, args, 3);
+ util.deleteTable(table);
}
+
+ @Test
+ public void testBulkOutputWithoutAnExistingTable() throws Exception {
+ String table = "test-" + UUID.randomUUID();
- private void doMROnTableTest(String inputFile, String family,
- String tableName, String data, String[] args, int valueMultiplier)
- throws Exception {
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+ String[] args = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+ table
+ };
- // Cluster
- HBaseTestingUtility htu1 = new HBaseTestingUtility();
+ doMROnTableTest(util, FAMILY, null, args, 3);
+ util.deleteTable(table);
+ }
- htu1.startMiniCluster();
- htu1.startMiniMapReduceCluster();
+ @Test
+ public void testBulkOutputWithAnExistingTable() throws Exception {
+ String table = "test-" + UUID.randomUUID();
+ // Prepare the arguments required for the test.
+ Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+ String[] args = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+ "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+ table
+ };
+
+ util.createTable(table, FAMILY);
+ doMROnTableTest(util, FAMILY, null, args, 3);
+ util.deleteTable(table);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
+ String data, String[] args) throws Exception {
+ return doMROnTableTest(util, family, data, args, 1);
+ }
+
+ /**
+ * Run an ImportTsv job and perform basic validation on the results.
+ * Returns the ImportTsv <code>Tool</code> instance so that other tests can
+ * inspect it for further validation as necessary. This method is static to
+ * insure non-reliance on instance's util/conf facilities.
+ * @param args Any arguments to pass BEFORE inputFile path is appended.
+ * @return The Tool instance used to run the test.
+ */
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
+ String data, String[] args, int valueMultiplier)
+ throws Exception {
+ String table = args[args.length - 1];
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ if (data == null) {
+ data = "KEY\u001bVALUE1\u001bVALUE2\n";
+ }
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("min.num.spills.for.combine", 1);
+ }
+
+ // run the import
+ List<String> argv = new ArrayList<String>(Arrays.asList(args));
+ argv.add(inputPath.toString());
Tool tool = new ImportTsv();
- tool.setConf(htu1.getConfiguration());
+ LOG.debug("Running ImportTsv with arguments: " + argv);
+ assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
- try {
- FileSystem fs = FileSystem.get(tool.getConf());
- FSDataOutputStream op = fs.create(new Path(inputFile), true);
- if (data == null) {
- data = "KEY\u001bVALUE1\u001bVALUE2\n";
- }
- op.write(Bytes.toBytes(data));
- op.close();
- LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile))));
-
- if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(family));
- HBaseAdmin admin = new HBaseAdmin(tool.getConf());
- admin.createTable(desc);
- admin.close();
+ // Perform basic validation. If the input args did not include
+ // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
+ // Otherwise, validate presence of hfiles.
+ boolean createdHFiles = false;
+ String outputPath = null;
+ for (String arg : argv) {
+ if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+ createdHFiles = true;
+ // split '-Dfoo=bar' on '=' and keep 'bar'
+ outputPath = arg.split("=")[1];
+ break;
}
- // force use of combiner for testing purposes
- tool.getConf().setInt("min.num.spills.for.combine", 1);
- assertEquals(0, ToolRunner.run(tool, args));
-
- HTable table = new HTable(tool.getConf(), tableName);
- boolean verified = false;
- long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000);
- int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5);
- for (int i = 0; i < numRetries; i++) {
- try {
- Scan scan = new Scan();
- // Scan entire family.
- scan.addFamily(Bytes.toBytes(family));
- ResultScanner resScanner = table.getScanner(scan);
- for (Result res : resScanner) {
- assertTrue(res.size() == 2);
- List<KeyValue> kvs = res.list();
- assertEquals(toU8Str(kvs.get(0).getRow()),
- toU8Str(Bytes.toBytes("KEY")));
- assertEquals(toU8Str(kvs.get(1).getRow()),
- toU8Str(Bytes.toBytes("KEY")));
- assertEquals(toU8Str(kvs.get(0).getValue()),
- toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
- assertEquals(toU8Str(kvs.get(1).getValue()),
- toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
- // Only one result set is expected, so let it loop.
- }
- verified = true;
- break;
- } catch (NullPointerException e) {
- // If here, a cell was empty. Presume its because updates came in
- // after the scanner had been opened. Wait a while and retry.
- }
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- // continue
+ }
+
+ if (createdHFiles)
+ validateHFiles(fs, outputPath, family);
+ else
+ validateTable(conf, table, family, valueMultiplier);
+
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table);
+ }
+ return tool;
+ }
+
+ /**
+ * Confirm ImportTsv via data in online table.
+ */
+ private static void validateTable(Configuration conf, String tableName,
+ String family, int valueMultiplier) throws IOException {
+
+ LOG.debug("Validating table.");
+ HTable table = new HTable(conf, tableName);
+ boolean verified = false;
+ long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = conf.getInt("hbase.client.retries.number", 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ Scan scan = new Scan();
+ // Scan entire family.
+ scan.addFamily(Bytes.toBytes(family));
+ ResultScanner resScanner = table.getScanner(scan);
+ for (Result res : resScanner) {
+ assertTrue(res.size() == 2);
+ List<KeyValue> kvs = res.list();
+ assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY"));
+ assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY"));
+ assertArrayEquals(kvs.get(0).getValue(),
+ Bytes.toBytes("VALUE" + valueMultiplier));
+ assertArrayEquals(kvs.get(1).getValue(),
+ Bytes.toBytes("VALUE" + 2 * valueMultiplier));
+ // Only one result set is expected, so let it loop.
}
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
}
- table.close();
- assertTrue(verified);
- } finally {
- htu1.shutdownMiniMapReduceCluster();
- htu1.shutdownMiniCluster();
}
+ table.close();
+ assertTrue(verified);
}
-
- @Test
- public void testBulkOutputWithoutAnExistingTable() throws Exception {
- String TABLE_NAME = "TestTable";
- String FAMILY = "FAM";
- String INPUT_FILE = "InputFile2.esv";
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
- "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
- INPUT_FILE };
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
- }
+ /**
+ * Confirm ImportTsv via HFiles on fs.
+ */
+ private static void validateHFiles(FileSystem fs, String outputPath, String family)
+ throws IOException {
- public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
- return new String(bytes);
+ // validate number and content of output columns
+ LOG.debug("Validating HFiles.");
+ Set<String> configFamilies = new HashSet<String>();
+ configFamilies.add(family);
+ Set<String> foundFamilies = new HashSet<String>();
+ for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
+ String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
+ String cf = elements[elements.length - 1];
+ foundFamilies.add(cf);
+ assertTrue(
+ String.format(
+ "HFile ouput contains a column family (%s) not present in input families (%s)",
+ cf, configFamilies),
+ configFamilies.contains(cf));
+ for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
+ assertTrue(
+ String.format("HFile %s appears to contain no data.", hfile.getPath()),
+ hfile.getLen() > 0);
+ }
+ }
}
-
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java?rev=1456700&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java Thu Mar 14 22:28:08 2013
@@ -0,0 +1,171 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests for {@link TsvParser}.
+ */
+@Category(SmallTests.class)
+public class TestImportTsvParser {
+
+ private void assertBytesEquals(byte[] a, byte[] b) {
+ assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+ }
+
+ private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
+ ArrayList<String> parsedCols = new ArrayList<String>();
+ for (int i = 0; i < parsed.getColumnCount(); i++) {
+ parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
+ parsed.getColumnLength(i)));
+ }
+ if (!Iterables.elementsEqual(parsedCols, expected)) {
+ fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
+ + Joiner.on(",").join(parsedCols));
+ }
+ }
+
+ @Test
+ public void testTsvParserSpecParsing() {
+ TsvParser parser;
+
+ parser = new TsvParser("HBASE_ROW_KEY", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ }
+
+ @Test
+ public void testTsvParser() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
+ assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
+ assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
+ assertNull(parser.getFamily(2));
+ assertNull(parser.getQualifier(2));
+ assertEquals(2, parser.getRowKeyColumnIndex());
+
+ assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
+ parser.getTimestampKeyColumnIndex());
+
+ byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
+ ParsedLine parsed = parser.parse(line, line.length);
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ @Test
+ public void testTsvParserWithTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertNull(parser.getFamily(1));
+ assertNull(parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
+ assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+
+ byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
+ ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(1234l, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ /**
+ * Test cases that throw BadTsvLineException
+ */
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+ byte[] line = Bytes.toBytes("key_only");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
+ byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
+ ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(-1, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserNoTimestampValue() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a");
+ parser.parse(line, line.length);
+ }
+}