You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/06/15 18:51:47 UTC

svn commit: r1350691 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Author: ramkrishna
Date: Fri Jun 15 16:51:47 2012
New Revision: 1350691

URL: http://svn.apache.org/viewvc?rev=1350691&view=rev
Log:
HBASE-5564 Bulkload is discarding duplicate records

Submitted by:Laxman	
Reviewed by:iStack, Ted, Ram	

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Fri Jun 15 16:51:47 2012
@@ -83,7 +83,16 @@ public class ImportTsv {
 
     private int rowKeyColumnIndex;
 
-    public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
+    private int maxColumnCount;
+
+    // Default value must be negative
+    public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
+
+    private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
+
+    public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
+
+    public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
 
     /**
      * @param columnsSpecification the list of columns to parser out, comma separated.
@@ -100,8 +109,9 @@ public class ImportTsv {
       ArrayList<String> columnStrings = Lists.newArrayList(
         Splitter.on(',').trimResults().split(columnsSpecification));
 
-      families = new byte[columnStrings.size()][];
-      qualifiers = new byte[columnStrings.size()][];
+      maxColumnCount = columnStrings.size();
+      families = new byte[maxColumnCount][];
+      qualifiers = new byte[maxColumnCount][];
 
       for (int i = 0; i < columnStrings.size(); i++) {
         String str = columnStrings.get(i);
@@ -109,6 +119,12 @@ public class ImportTsv {
           rowKeyColumnIndex = i;
           continue;
         }
+        
+        if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
+          timestampKeyColumnIndex = i;
+          continue;
+        }
+        
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -120,6 +136,14 @@ public class ImportTsv {
       }
     }
 
+    public boolean hasTimestamp() {
+      return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
+    }
+
+    public int getTimestampKeyColumnIndex() {
+      return timestampKeyColumnIndex;
+    }
+
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
@@ -133,7 +157,7 @@ public class ImportTsv {
     public ParsedLine parse(byte[] lineBytes, int length)
     throws BadTsvLineException {
       // Enumerate separator offsets
-      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
+      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
       for (int i = 0; i < length; i++) {
         if (lineBytes[i] == separatorByte) {
           tabOffsets.add(i);
@@ -145,10 +169,13 @@ public class ImportTsv {
 
       tabOffsets.add(length);
 
-      if (tabOffsets.size() > families.length) {
+      if (tabOffsets.size() > maxColumnCount) {
         throw new BadTsvLineException("Excessive columns");
       } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
         throw new BadTsvLineException("No row key");
+      } else if (hasTimestamp()
+          && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
+        throw new BadTsvLineException("No timestamp");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -168,6 +195,24 @@ public class ImportTsv {
       public int getRowKeyLength() {
         return getColumnLength(rowKeyColumnIndex);
       }
+      
+      public long getTimestamp(long ts) throws BadTsvLineException {
+        // Return ts if HBASE_TS_KEY is not configured in column spec
+        if (!hasTimestamp()) {
+          return ts;
+        }
+
+        String timeStampStr = Bytes.toString(lineBytes,
+            getColumnOffset(timestampKeyColumnIndex),
+            getColumnLength(timestampKeyColumnIndex));
+        try {
+          return Long.parseLong(timeStampStr);
+        } catch (NumberFormatException nfe) {
+          // treat this record as bad record
+          throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
+        }
+      }
+      
       public int getColumnOffset(int idx) {
         if (idx > 0)
           return tabOffsets.get(idx - 1) + 1;
@@ -289,7 +334,11 @@ public class ImportTsv {
       "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
       "as the row key for each imported record. You must specify exactly one column\n" +
       "to be the row key, and you must specify a column name for every column that exists in the\n" +
-      "input data.\n" +
+      "input data. Another special column HBASE_TS_KEY designates that this column should be\n" +
+      "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" +
+      "You must specify atmost one column as timestamp key for each imported record.\n" +
+      "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
+      "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" +
       "\n" +
       "By default importtsv will load data directly into HBase. To instead generate\n" +
       "HFiles of data to prepare for a bulk data load, pass the option:\n" +
@@ -348,11 +397,33 @@ public class ImportTsv {
       System.exit(-1);
     }
 
-    // Make sure one or more columns are specified
-    if (columns.length < 2) {
-      usage("One or more columns in addition to the row key are required");
+    // Make sure we have at most one column as the timestamp key
+    int tskeysFound = 0;
+    for (String col : columns) {
+      if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
+        tskeysFound++;
+    }
+    if (tskeysFound > 1) {
+      usage("Must specify at most one column as "
+          + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
+      System.exit(-1);
+    }
+    
+    // Make sure one or more columns are specified excluding rowkey and
+    // timestamp key
+    if (columns.length - (rowkeysFound + tskeysFound) < 1) {
+      usage("One or more columns in addition to the row key and timestamp(optional) are required");
       System.exit(-1);
     }
+
+    // If timestamp option is not specified, use current system time.
+    long timstamp = conf
+        .getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+
+    // Set it back to replace invalid timestamp (non-numeric) with current
+    // system time
+    conf.setLong(TIMESTAMP_CONF_KEY, timstamp); 
+    
     hbaseAdmin = new HBaseAdmin(conf);
     Job job = createSubmittableJob(conf, otherArgs);
     System.exit(job.waitForCompletion(true) ? 0 : 1);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java Fri Jun 15 16:51:47 2012
@@ -105,7 +105,9 @@ extends Mapper<LongWritable, Text, Immut
       separator = new String(Base64.decode(separator));
     }
 
-    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+    // Should never get 0 as we are setting this to a valid value in job
+    // configuration.
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
 
     skipBadLines = context.getConfiguration().getBoolean(
         ImportTsv.SKIP_LINES_CONF_KEY, true);
@@ -128,10 +130,15 @@ extends Mapper<LongWritable, Text, Immut
         new ImmutableBytesWritable(lineBytes,
             parsed.getRowKeyOffset(),
             parsed.getRowKeyLength());
+      // Retrieve timestamp if exists
+      ts = parsed.getTimestamp(ts);
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
-        if (i == parser.getRowKeyColumnIndex()) continue;
+        if (i == parser.getRowKeyColumnIndex()
+            || i == parser.getTimestampKeyColumnIndex()) {
+          continue;
+        }
         KeyValue kv = new KeyValue(
             lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
             parser.getFamily(i), 0, parser.getFamily(i).length,

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Fri Jun 15 16:51:47 2012
@@ -61,6 +61,7 @@ public class TestImportTsv {
     assertNull(parser.getFamily(0));
     assertNull(parser.getQualifier(0));
     assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
 
     parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
     assertNull(parser.getFamily(0));
@@ -68,6 +69,7 @@ public class TestImportTsv {
     assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
     assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
     assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
 
     parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
     assertNull(parser.getFamily(0));
@@ -77,6 +79,19 @@ public class TestImportTsv {
     assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
     assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
     assertEquals(0, parser.getRowKeyColumnIndex());
+    assertFalse(parser.hasTimestamp());
+    
+    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
+        "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertTrue(parser.hasTimestamp());
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
   }
 
   @Test
@@ -90,10 +105,32 @@ public class TestImportTsv {
     assertNull(parser.getQualifier(2));
     assertEquals(2, parser.getRowKeyColumnIndex());
     
+    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser
+        .getTimestampKeyColumnIndex());
+    
     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
     ParsedLine parsed = parser.parse(line, line.length);
     checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
   }
+  
+  
+  @Test
+  public void testTsvParserWithTimestamp() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+    assertNull(parser.getFamily(0));
+    assertNull(parser.getQualifier(0));
+    assertNull(parser.getFamily(1));
+    assertNull(parser.getQualifier(1));
+    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
+    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    assertEquals(1, parser.getTimestampKeyColumnIndex());
+
+    byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
+    ParsedLine parsed = parser.parse(line, line.length);
+    assertEquals(1234l, parsed.getTimestamp(-1));
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+  }
 
   private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
     ArrayList<String> parsedCols = new ArrayList<String>();
@@ -120,29 +157,48 @@ public class TestImportTsv {
   public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
-    ParsedLine parsed = parser.parse(line, line.length);
+    parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("");
-    ParsedLine parsed = parser.parse(line, line.length);
+    parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("key_only");
-    ParsedLine parsed = parser.parse(line, line.length);
+    parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
     TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
     byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
+    parser.parse(line, line.length);
+  }
+  
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+    assertEquals(1, parser.getTimestampKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
     ParsedLine parsed = parser.parse(line, line.length);
+    assertEquals(-1, parsed.getTimestamp(-1));
+    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
   }
+  
+  @Test(expected = BadTsvLineException.class)
+  public void testTsvParserNoTimestampValue() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+    assertEquals(2, parser.getTimestampKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\tval_a");
+    parser.parse(line, line.length);
+  }
+  
 
   @Test
   public void testMROnTable()
@@ -159,8 +215,25 @@ public class TestImportTsv {
         INPUT_FILE
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
+  }
+  
+  @Test
+  public void testMROnTableWithTimestamp() throws Exception {
+    String TABLE_NAME = "TestTable";
+    String FAMILY = "FAM";
+    String INPUT_FILE = "InputFile1.csv";
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY
+            + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
+
+    String data = "KEY,1234,VALUE1,VALUE2\n";
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
   }
+  
 
   @Test
   public void testMROnTableWithCustomMapper()
@@ -176,16 +249,17 @@ public class TestImportTsv {
         INPUT_FILE
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
   }
 
-  private void doMROnTableTest(String inputFile, String family, String tableName,
-                               String[] args, int valueMultiplier) throws Exception {
+  private void doMROnTableTest(String inputFile, String family,
+      String tableName, String data, String[] args, int valueMultiplier)
+      throws Exception {
 
     // Cluster
     HBaseTestingUtility htu1 = new HBaseTestingUtility();
 
-    MiniHBaseCluster cluster = htu1.startMiniCluster();
+    htu1.startMiniCluster();
     htu1.startMiniMapReduceCluster();
 
     GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
@@ -196,14 +270,14 @@ public class TestImportTsv {
 
       FileSystem fs = FileSystem.get(conf);
       FSDataOutputStream op = fs.create(new Path(inputFile), true);
-      String line = "KEY\u001bVALUE1\u001bVALUE2\n";
-      op.write(line.getBytes(HConstants.UTF8_ENCODING));
+      if (data == null) {
+        data = "KEY\u001bVALUE1\u001bVALUE2\n";
+      }
+      op.write(Bytes.toBytes(data));
       op.close();
 
       final byte[] FAM = Bytes.toBytes(family);
       final byte[] TAB = Bytes.toBytes(tableName);
-      final byte[] QA = Bytes.toBytes("A");
-      final byte[] QB = Bytes.toBytes("B");
       if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
         HTableDescriptor desc = new HTableDescriptor(TAB);
         desc.addFamily(new HColumnDescriptor(FAM));
@@ -269,11 +343,11 @@ public class TestImportTsv {
         "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
         "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
         INPUT_FILE };
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
   }
 
   public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
-    return new String(bytes, HConstants.UTF8_ENCODING);
+    return new String(bytes);
   }
 
   @org.junit.Rule