You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/03/14 23:28:08 UTC

svn commit: r1456700 - in /hbase/trunk: hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/

Author: enis
Date: Thu Mar 14 22:28:08 2013
New Revision: 1456700

URL: http://svn.apache.org/r1456700
Log:
HBASE-7938 Add integration test for ImportTsv/LoadIncrementalHFiles workflow (Nick Dimiduk)

Added:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
Modified:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java?rev=1456700&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java (added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java Thu Mar 14 22:28:08 2013
@@ -0,0 +1,191 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestImportTsv implements Configurable, Tool {
+
+  private static final String NAME = IntegrationTestImportTsv.class.getSimpleName();
+  protected static final Log LOG = LogFactory.getLog(IntegrationTestImportTsv.class);
+
+  protected static final String simple_tsv =
+      "row1\t1\tc1\tc2\n" +
+      "row2\t1\tc1\tc2\n" +
+      "row3\t1\tc1\tc2\n" +
+      "row4\t1\tc1\tc2\n" +
+      "row5\t1\tc1\tc2\n" +
+      "row6\t1\tc1\tc2\n" +
+      "row7\t1\tc1\tc2\n" +
+      "row8\t1\tc1\tc2\n" +
+      "row9\t1\tc1\tc2\n" +
+      "row10\t1\tc1\tc2\n";
+
+  protected static final Set<KeyValue> simple_expected =
+      new TreeSet<KeyValue>(KeyValue.COMPARATOR) {
+    private static final long serialVersionUID = 1L;
+    {
+      byte[] family = Bytes.toBytes("d");
+      for (String line : simple_tsv.split("\n")) {
+        String[] row = line.split("\t");
+        byte[] key = Bytes.toBytes(row[0]);
+        long ts = Long.parseLong(row[1]);
+        byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
+        add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0]));
+        add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1]));
+      }
+    }
+  };
+
+  // this instance is initialized on first access when the test is run from
+  // JUnit/Maven or by main when run from the CLI.
+  protected static IntegrationTestingUtility util = null;
+
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    if (null == util) {
+      util = new IntegrationTestingUtility();
+    }
+    util.initializeCluster(1);
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.restoreCluster();
+    util = null;
+  }
+
+  /**
+   * Verify the data described by <code>simple_tsv</code> matches
+   * <code>simple_expected</code>.
+   */
+  protected void doLoadIncrementalHFiles(Path hfiles, String tableName)
+      throws Exception {
+
+    String[] args = { hfiles.toString(), tableName };
+    LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
+    assertEquals("Loading HFiles failed.",
+      0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args));
+
+    HTable table = null;
+    Scan scan = new Scan() {{
+      setCacheBlocks(false);
+      setCaching(1000);
+    }};
+    try {
+      table = new HTable(getConf(), tableName);
+      Iterator<Result> resultsIt = table.getScanner(scan).iterator();
+      Iterator<KeyValue> expectedIt = simple_expected.iterator();
+      while (resultsIt.hasNext() && expectedIt.hasNext()) {
+        Result r = resultsIt.next();
+        for (KeyValue actual : r.raw()) {
+          assertTrue(
+            "Ran out of expected values prematurely!",
+            expectedIt.hasNext());
+          KeyValue expected = expectedIt.next();
+          assertTrue(
+            format("Scan produced surprising result. expected: <%s>, actual: %s",
+              expected, actual),
+            KeyValue.COMPARATOR.compare(expected, actual) == 0);
+        }
+      }
+      assertFalse("Did not consume all expected values.", expectedIt.hasNext());
+      assertFalse("Did not consume all scan results.", resultsIt.hasNext());
+    } finally {
+      if (null != table) table.close();
+    }
+  }
+
+  @Test
+  public void testGenerateAndLoad() throws Exception {
+    String table = NAME + "-" + UUID.randomUUID();
+    String cf = "d";
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+
+    String[] args = {
+        format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
+        format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
+          ImportTsv.COLUMNS_CONF_KEY, cf, cf),
+        // configure the test harness to NOT delete the HFiles after they're
+        // generated. We need those for doLoadIncrementalHFiles
+        format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
+        table
+    };
+
+    // run the job, complete the load.
+    util.createTable(table, cf);
+    TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+    doLoadIncrementalHFiles(hfiles, table);
+    util.deleteTable(table);
+    util.cleanupDataTestDirOnTestFS(table);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length != 0) {
+      System.err.println(format("%s [genericOptions]", NAME));
+      System.err.println("  Runs ImportTsv integration tests against a distributed cluster.");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+
+    // adding more test methods? Don't forget to add them here... or consider doing what
+    // IntegrationTestsDriver does.
+    provisionCluster();
+    testGenerateAndLoad();
+    releaseCluster();
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    util = new IntegrationTestingUtility(conf);
+    // not using ToolRunner to avoid unnecessary call to setConf()
+    args = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int status = new IntegrationTestImportTsv().run(args);
+    System.exit(status);
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1456700&r1=1456699&r2=1456700&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Mar 14 22:28:08 2013
@@ -315,7 +315,7 @@ public class HBaseTestingUtility extends
 
   /**
    * @return Where to write test data on the test filesystem; Returns working directory
-   * for the test filesytem by default
+   * for the test filesystem by default
    * @see #setupDataTestDirOnTestFS()
    * @see #getTestFileSystem()
    */
@@ -396,6 +396,7 @@ public class HBaseTestingUtility extends
     FileSystem fs = getTestFileSystem();
     if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
       File dataTestDir = new File(getDataTestDir().toString());
+      dataTestDir.deleteOnExit();
       dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
     } else {
       Path base = getBaseTestDirOnTestFS();
@@ -404,6 +405,29 @@ public class HBaseTestingUtility extends
       fs.deleteOnExit(dataTestDirOnTestFS);
     }
   }
+
+  /**
+   * Cleans the test data directory on the test filesystem.
+   * @return True if we removed the test dirs
+   * @throws IOException
+   */
+  public boolean cleanupDataTestDirOnTestFS() throws IOException {
+    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
+    if (ret)
+      dataTestDirOnTestFS = null;
+    return ret;
+  }
+
+  /**
+   * Cleans a subdirectory under the test data directory on the test filesystem.
+   * @return True if we removed child
+   * @throws IOException
+   */
+  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
+    Path cpath = getDataTestDirOnTestFS(subdirName);
+    return getTestFileSystem().delete(cpath, true);
+  }
+
   /**
    * Start a minidfscluster.
    * @param servers How many DNs to start.
@@ -959,6 +983,33 @@ public class HBaseTestingUtility extends
     getMiniHBaseCluster().compact(tableName, major);
   }
 
+  /**
+   * Create a table.
+   * @param tableName
+   * @param family
+   * @return An HTable instance for the created table.
+   * @throws IOException
+   */
+  public HTable createTable(String tableName, String family)
+  throws IOException{
+    return createTable(tableName, new String[] { family });
+  }
+
+  /**
+   * Create a table.
+   * @param tableName
+   * @param families
+   * @return An HTable instance for the created table.
+   * @throws IOException
+   */
+  public HTable createTable(String tableName, String[] families)
+  throws IOException {
+    List<byte[]> fams = new ArrayList<byte[]>(families.length);
+    for (String family : families) {
+      fams.add(Bytes.toBytes(family));
+    }
+    return createTable(Bytes.toBytes(tableName), fams.toArray(new byte[0][]));
+  }
 
   /**
    * Create a table.
@@ -1121,6 +1172,14 @@ public class HBaseTestingUtility extends
    * Drop an existing table
    * @param tableName existing table
    */
+  public void deleteTable(String tableName) throws IOException {
+    deleteTable(Bytes.toBytes(tableName));
+  }
+
+  /**
+   * Drop an existing table
+   * @param tableName existing table
+   */
   public void deleteTable(byte[] tableName) throws IOException {
     try {
       getHBaseAdmin().disableTable(tableName);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1456700&r1=1456699&r2=1456700&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Thu Mar 14 22:28:08 2013
@@ -18,343 +18,306 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
-import java.io.UnsupportedEncodingException;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
-@Category(MediumTests.class)
-public class TestImportTsv {
-  private static final Log LOG = LogFactory.getLog(TestImportTsv.class);
+@Category(LargeTests.class)
+public class TestImportTsv implements Configurable {
 
-  @Test
-  public void testTsvParserSpecParsing() {
-    TsvParser parser;
+  protected static final Log LOG = LogFactory.getLog(TestImportTsv.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
 
-    parser = new TsvParser("HBASE_ROW_KEY", "\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
-
-    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
-    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
-
-    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
-    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
-    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
-    
-    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
-        "\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
-    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
-    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertTrue(parser.hasTimestamp());
-    assertEquals(2, parser.getTimestampKeyColumnIndex());
-  }
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
 
-  @Test
-  public void testTsvParser() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
-    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
-    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
-    assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
-    assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
-    assertNull(parser.getFamily(2));
-    assertNull(parser.getQualifier(2));
-    assertEquals(2, parser.getRowKeyColumnIndex());
-    
-    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser
-        .getTimestampKeyColumnIndex());
-    
-    byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
-    ParsedLine parsed = parser.parse(line, line.length);
-    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
-  }
-  
-  
-  @Test
-  public void testTsvParserWithTimestamp() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertNull(parser.getFamily(1));
-    assertNull(parser.getQualifier(1));
-    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
-    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertEquals(1, parser.getTimestampKeyColumnIndex());
-
-    byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
-    ParsedLine parsed = parser.parse(line, line.length);
-    assertEquals(1234l, parsed.getTimestamp(-1));
-    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
-  }
-
-  private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
-    ArrayList<String> parsedCols = new ArrayList<String>();
-    for (int i = 0; i < parsed.getColumnCount(); i++) {
-      parsedCols.add(Bytes.toString(
-          parsed.getLineBytes(),
-          parsed.getColumnOffset(i),
-          parsed.getColumnLength(i)));
-    }
-    if (!Iterables.elementsEqual(parsedCols, expected)) {
-      fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
-          "Got:" + Joiner.on(",").join(parsedCols));
-    }
-  }
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
 
-  private void assertBytesEquals(byte[] a, byte[] b) {
-    assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+  public Configuration getConf() {
+    return util.getConfiguration();
   }
 
-  /**
-   * Test cases that throw BadTsvLineException
-   */
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
-    byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
-    parser.parse(line, line.length);
-  }
-
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
-    byte[] line = Bytes.toBytes("");
-    parser.parse(line, line.length);
-  }
-
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
-    byte[] line = Bytes.toBytes("key_only");
-    parser.parse(line, line.length);
-  }
-
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
-    byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
-    parser.parse(line, line.length);
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
   }
-  
-  @Test(expected = BadTsvLineException.class)
-  public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
-    assertEquals(1, parser.getTimestampKeyColumnIndex());
-    byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
-    ParsedLine parsed = parser.parse(line, line.length);
-    assertEquals(-1, parsed.getTimestamp(-1));
-    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    util.startMiniCluster();
+    util.startMiniMapReduceCluster();
   }
-  
-  @Test(expected = BadTsvLineException.class)
-  public void testTsvParserNoTimestampValue() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
-    assertEquals(2, parser.getTimestampKeyColumnIndex());
-    byte[] line = Bytes.toBytes("rowkey\tval_a");
-    parser.parse(line, line.length);
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniMapReduceCluster();
+    util.shutdownMiniCluster();
   }
-  
 
   @Test
-  public void testMROnTable()
-  throws Exception {
-    String TABLE_NAME = "TestTable";
-    String FAMILY = "FAM";
-    String INPUT_FILE = "InputFile.esv";
+  public void testMROnTable() throws Exception {
+    String table = "test-" + UUID.randomUUID();
 
     // Prepare the arguments required for the test.
     String[] args = new String[] {
         "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
         "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        TABLE_NAME,
-        INPUT_FILE
+        table
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
+    util.createTable(table, FAMILY);
+    doMROnTableTest(util, FAMILY, null, args, 1);
+    util.deleteTable(table);
   }
   
   @Test
   public void testMROnTableWithTimestamp() throws Exception {
-    String TABLE_NAME = "TestTable";
-    String FAMILY = "FAM";
-    String INPUT_FILE = "InputFile1.csv";
+    String table = "test-" + UUID.randomUUID();
 
     // Prepare the arguments required for the test.
     String[] args = new String[] {
         "-D" + ImportTsv.COLUMNS_CONF_KEY
             + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
-
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+        table
+    };
     String data = "KEY,1234,VALUE1,VALUE2\n";
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
+
+    util.createTable(table, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(table);
   }
   
 
   @Test
   public void testMROnTableWithCustomMapper()
   throws Exception {
-    String TABLE_NAME = "TestTable";
-    String FAMILY = "FAM";
-    String INPUT_FILE = "InputFile2.esv";
+    String table = "test-" + UUID.randomUUID();
 
     // Prepare the arguments required for the test.
     String[] args = new String[] {
         "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
-        TABLE_NAME,
-        INPUT_FILE
+        table
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
+    util.createTable(table, FAMILY);
+    doMROnTableTest(util, FAMILY, null, args, 3);
+    util.deleteTable(table);
   }
+  
+  @Test
+  public void testBulkOutputWithoutAnExistingTable() throws Exception {
+    String table = "test-" + UUID.randomUUID();
 
-  private void doMROnTableTest(String inputFile, String family,
-      String tableName, String data, String[] args, int valueMultiplier)
-      throws Exception {
+    // Prepare the arguments required for the test.
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    String[] args = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+        table
+    };
 
-    // Cluster
-    HBaseTestingUtility htu1 = new HBaseTestingUtility();
+    doMROnTableTest(util, FAMILY, null, args, 3);
+    util.deleteTable(table);
+  }
 
-    htu1.startMiniCluster();
-    htu1.startMiniMapReduceCluster();
+  @Test
+  public void testBulkOutputWithAnExistingTable() throws Exception {
+    String table = "test-" + UUID.randomUUID();
 
+    // Prepare the arguments required for the test.
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    String[] args = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+        table
+    };
+
+    util.createTable(table, FAMILY);
+    doMROnTableTest(util, FAMILY, null, args, 3);
+    util.deleteTable(table);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
+      String data, String[] args) throws Exception {
+    return doMROnTableTest(util, family, data, args, 1);
+  }
+
+  /**
+   * Run an ImportTsv job and perform basic validation on the results.
+   * Returns the ImportTsv <code>Tool</code> instance so that other tests can
+   * inspect it for further validation as necessary. This method is static to
+   * insure non-reliance on instance's util/conf facilities.
+   * @param args Any arguments to pass BEFORE inputFile path is appended.
+   * @return The Tool instance used to run the test.
+   */
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
+      String data, String[] args, int valueMultiplier)
+  throws Exception {
+    String table = args[args.length - 1];
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    if (data == null) {
+      data = "KEY\u001bVALUE1\u001bVALUE2\n";
+    }
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("min.num.spills.for.combine", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<String>(Arrays.asList(args));
+    argv.add(inputPath.toString());
     Tool tool = new ImportTsv();
-    tool.setConf(htu1.getConfiguration());
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
 
-    try {
-      FileSystem fs = FileSystem.get(tool.getConf());
-      FSDataOutputStream op = fs.create(new Path(inputFile), true);
-      if (data == null) {
-        data = "KEY\u001bVALUE1\u001bVALUE2\n";
-      }
-      op.write(Bytes.toBytes(data));
-      op.close();
-      LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile))));
-
-      if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
-        HTableDescriptor desc = new HTableDescriptor(tableName);
-        desc.addFamily(new HColumnDescriptor(family));
-        HBaseAdmin admin = new HBaseAdmin(tool.getConf());
-        admin.createTable(desc);
-        admin.close();
+    // Perform basic validation. If the input args did not include
+    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
+    // Otherwise, validate presence of hfiles.
+    boolean createdHFiles = false;
+    String outputPath = null;
+    for (String arg : argv) {
+      if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+        createdHFiles = true;
+        // split '-Dfoo=bar' on '=' and keep 'bar'
+        outputPath = arg.split("=")[1];
+        break;
       }
-      // force use of combiner for testing purposes
-      tool.getConf().setInt("min.num.spills.for.combine", 1);
-      assertEquals(0, ToolRunner.run(tool, args));
-      
-      HTable table = new HTable(tool.getConf(), tableName);
-      boolean verified = false;
-      long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000);
-      int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5);
-      for (int i = 0; i < numRetries; i++) {
-        try {
-          Scan scan = new Scan();
-          // Scan entire family.
-          scan.addFamily(Bytes.toBytes(family));
-          ResultScanner resScanner = table.getScanner(scan);
-          for (Result res : resScanner) {
-            assertTrue(res.size() == 2);
-            List<KeyValue> kvs = res.list();
-            assertEquals(toU8Str(kvs.get(0).getRow()),
-                toU8Str(Bytes.toBytes("KEY")));
-            assertEquals(toU8Str(kvs.get(1).getRow()),
-                toU8Str(Bytes.toBytes("KEY")));
-            assertEquals(toU8Str(kvs.get(0).getValue()),
-                toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
-            assertEquals(toU8Str(kvs.get(1).getValue()),
-                toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
-            // Only one result set is expected, so let it loop.
-          }
-          verified = true;
-          break;
-        } catch (NullPointerException e) {
-          // If here, a cell was empty.  Presume its because updates came in
-          // after the scanner had been opened.  Wait a while and retry.
-        }
-        try {
-          Thread.sleep(pause);
-        } catch (InterruptedException e) {
-          // continue
+    }
+
+    if (createdHFiles)
+      validateHFiles(fs, outputPath, family);
+    else
+      validateTable(conf, table, family, valueMultiplier);
+
+    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+      LOG.debug("Deleting test subdirectory");
+      util.cleanupDataTestDirOnTestFS(table);
+    }
+    return tool;
+  }
+
+  /**
+   * Confirm ImportTsv via data in online table.
+   */
+  private static void validateTable(Configuration conf, String tableName,
+      String family, int valueMultiplier) throws IOException {
+
+    LOG.debug("Validating table.");
+    HTable table = new HTable(conf, tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt("hbase.client.retries.number", 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        Scan scan = new Scan();
+        // Scan entire family.
+        scan.addFamily(Bytes.toBytes(family));
+        ResultScanner resScanner = table.getScanner(scan);
+        for (Result res : resScanner) {
+          assertTrue(res.size() == 2);
+          List<KeyValue> kvs = res.list();
+          assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY"));
+          assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY"));
+          assertArrayEquals(kvs.get(0).getValue(),
+            Bytes.toBytes("VALUE" + valueMultiplier));
+          assertArrayEquals(kvs.get(1).getValue(),
+            Bytes.toBytes("VALUE" + 2 * valueMultiplier));
+          // Only one result set is expected, so let it loop.
         }
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
       }
-      table.close();
-      assertTrue(verified);
-    } finally {
-      htu1.shutdownMiniMapReduceCluster();
-      htu1.shutdownMiniCluster();
     }
+    table.close();
+    assertTrue(verified);
   }
-  
-  @Test
-  public void testBulkOutputWithoutAnExistingTable() throws Exception {
-    String TABLE_NAME = "TestTable";
-    String FAMILY = "FAM";
-    String INPUT_FILE = "InputFile2.esv";
 
-    // Prepare the arguments required for the test.
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
-        INPUT_FILE };
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
-  }
+  /**
+   * Confirm ImportTsv via HFiles on fs.
+   */
+  private static void validateHFiles(FileSystem fs, String outputPath, String family)
+      throws IOException {
 
-  public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
-    return new String(bytes);
+    // validate number and content of output columns
+    LOG.debug("Validating HFiles.");
+    Set<String> configFamilies = new HashSet<String>();
+    configFamilies.add(family);
+    Set<String> foundFamilies = new HashSet<String>();
+    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) {
+      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
+      String cf = elements[elements.length - 1];
+      foundFamilies.add(cf);
+      assertTrue(
+        String.format(
+          "HFile ouput contains a column family (%s) not present in input families (%s)",
+          cf, configFamilies),
+          configFamilies.contains(cf));
+      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
+        assertTrue(
+          String.format("HFile %s appears to contain no data.", hfile.getPath()),
+          hfile.getLen() > 0);
+      }
+    }
   }
-
 }
 

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java?rev=1456700&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java Thu Mar 14 22:28:08 2013
@@ -0,0 +1,171 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests for {@link TsvParser}.
+ */
+@Category(SmallTests.class)
+public class TestImportTsvParser {
+
+  private void assertBytesEquals(byte[] a, byte[] b) {
+    assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
+  }
+
+  private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
+    ArrayList<String> parsedCols = new ArrayList<String>();
+    for (int i = 0; i < parsed.getColumnCount(); i++) {
+      parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i),
+        parsed.getColumnLength(i)));
+    }
+    if (!Iterables.elementsEqual(parsedCols, expected)) {
+      fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:"
+          + Joiner.on(",").join(parsedCols));
+    }
+  }
+
+  @Test
+  public void testTsvParserSpecParsing() {
+    TsvParser parser;
+
+    parser = new TsvParser("HBASE_ROW_KEY", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
+
+    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
+
+    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
+    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
+
+    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertTrue(parser.hasTimestamp());
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
+  }
+
+  @Test
+  public void testTsvParser() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
+    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
+    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
+    assertNull(parser.getFamily(2));
+    assertNull(parser.getQualifier(2));
+    assertEquals(2, parser.getRowKeyColumnIndex());
+
+    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
+      parser.getTimestampKeyColumnIndex());
+
+    byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
+    ParsedLine parsed = parser.parse(line, line.length);
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+  }
+
+  @Test
+  public void testTsvParserWithTimestamp() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertNull(parser.getFamily(1));
+    assertNull(parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
+    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertEquals(1, parser.getTimestampKeyColumnIndex());
+
+    byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
+    ParsedLine parsed = parser.parse(line, line.length);
+    assertEquals(1234l, parsed.getTimestamp(-1));
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+  }
+
+  /**
+   * Test cases that throw BadTsvLineException
+   */
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+    byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
+    parser.parse(line, line.length);
+  }
+
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+    byte[] line = Bytes.toBytes("");
+    parser.parse(line, line.length);
+  }
+
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
+    byte[] line = Bytes.toBytes("key_only");
+    parser.parse(line, line.length);
+  }
+
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
+    byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
+    parser.parse(line, line.length);
+  }
+
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+    assertEquals(1, parser.getTimestampKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
+    ParsedLine parsed = parser.parse(line, line.length);
+    assertEquals(-1, parsed.getTimestamp(-1));
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+  }
+
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserNoTimestampValue() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\tval_a");
+    parser.parse(line, line.length);
+  }
+}